Skip to content

Commit 4b33cb4

Browse files
graylog-internal-actions-access[bot]patrickmannAntonEbelclaudexd4rker
authored
Honor max_event_age in cluster event periodical and improve performance (7.0) (#25514) (#25631)
* Honor max_event_age in cluster event periodical and improve performance (#25265) * honor max_event_age * CL * add validation; migrate test format to junit5 style * reduce default and min period * revise default and min value * add config param documentation --------- (cherry picked from commit a2c2f52) * Drop old cluster_events index when creating the new one The original PR replaced the compound index from (timestamp, producer, consumers) to (consumers, timestamp) but didn't remove the old index. Drop it on startup if present. * adjust for graylog collection wrapper --------- (cherry picked from commit a3e0792) Co-authored-by: Patrick Mann <patrickmann@users.noreply.github.com> Co-authored-by: Anton Ebel <anton.ebel@graylog.com> Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Co-authored-by: Ismail Belkacim <xd4rker@users.noreply.github.com>
1 parent 098af8d commit 4b33cb4

7 files changed

Lines changed: 118 additions & 45 deletions

File tree

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
type = "f"
2+
message = "Honor the max_event_age configuration for cluster event cleanup. Default reduced from 24h to 12h."
3+
4+
details.user = """
5+
A clean-up job runs at a period of max_event_age to remove old events from the cluster. Note that
6+
events created just after the last clean-up job will be removed after max_event_age, so the effective
7+
age of events in the cluster is between max_event_age and 2*max_event_age.
8+
The default value of max_event_age is reduced from 24h to 12h to ensure that events are cleaned up in a more timely manner.
9+
"""
10+
11+
issues = ["25259"]
12+
pulls = ["25265"]

graylog2-server/src/main/java/org/graylog2/Configuration.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.graylog2.cluster.lock.MongoLockService;
4040
import org.graylog2.configuration.Documentation;
4141
import org.graylog2.configuration.converters.JavaDurationConverter;
42+
import org.graylog2.configuration.validators.PositiveJavaDurationValidator;
4243
import org.graylog2.notifications.Notification;
4344
import org.graylog2.outputs.BatchSizeConfig;
4445
import org.graylog2.plugin.Tools;
@@ -298,8 +299,9 @@ public class Configuration extends CaConfiguration implements CommonNodeConfigur
298299
@Parameter(value = "global_inputs_only")
299300
private boolean globalInputsOnly = false;
300301

301-
@Parameter(value = "max_event_age", converter = JavaDurationConverter.class)
302-
private java.time.Duration maxEventAge = java.time.Duration.ofDays(1L);
302+
@Documentation("Maximum age of cluster events before cleanup. The cleanup runs at this interval, so effective event age is between max_event_age and 2*max_event_age. Minimum effective interval is 1 hour. Default: 12h")
303+
@Parameter(value = "max_event_age", converter = JavaDurationConverter.class, validators = PositiveJavaDurationValidator.class)
304+
private java.time.Duration maxEventAge = java.time.Duration.ofHours(12L);
303305

304306
public boolean maintainsStreamAwareFieldTypes() {
305307
return streamAwareFieldTypes;
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright (C) 2020 Graylog, Inc.
3+
*
4+
* This program is free software: you can redistribute it and/or modify
5+
* it under the terms of the Server Side Public License, version 1,
6+
* as published by MongoDB, Inc.
7+
*
8+
* This program is distributed in the hope that it will be useful,
9+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
10+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11+
* Server Side Public License for more details.
12+
*
13+
* You should have received a copy of the Server Side Public License
14+
* along with this program. If not, see
15+
* <http://www.mongodb.com/licensing/server-side-public-license>.
16+
*/
17+
package org.graylog2.configuration.validators;
18+
19+
import com.github.joschi.jadconfig.ValidationException;
20+
import com.github.joschi.jadconfig.Validator;
21+
22+
import java.time.Duration;
23+
24+
public class PositiveJavaDurationValidator implements Validator<Duration> {
25+
@Override
26+
public void validate(String name, Duration value) throws ValidationException {
27+
if (value == null || value.isZero() || value.isNegative()) {
28+
throw new ValidationException("Parameter " + name + " must be a positive duration (found " + value + ")");
29+
}
30+
}
31+
}

graylog2-server/src/main/java/org/graylog2/events/ClusterEventCleanupPeriodical.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,26 +25,33 @@
2525
import jakarta.inject.Inject;
2626
import org.graylog2.database.MongoCollections;
2727
import org.graylog2.plugin.periodical.Periodical;
28-
import org.joda.time.DateTime;
29-
import org.joda.time.DateTimeZone;
3028
import org.slf4j.Logger;
3129
import org.slf4j.LoggerFactory;
3230

31+
import java.time.Clock;
3332
import java.time.Duration;
34-
import java.util.concurrent.TimeUnit;
33+
import java.time.Instant;
3534

3635
public class ClusterEventCleanupPeriodical extends Periodical {
3736
private static final Logger LOG = LoggerFactory.getLogger(ClusterEventCleanupPeriodical.class);
3837
private static final String COLLECTION_NAME = ClusterEventPeriodical.COLLECTION_NAME;
38+
private static final long MIN_PERIOD_SECONDS = 3600;
3939

4040
private final MongoCollection<ClusterEvent> collection;
4141
private final Duration maxEventAge;
42+
private final Clock clock;
4243

4344
@Inject
4445
public ClusterEventCleanupPeriodical(MongoCollections mongoCollections, @Named("max_event_age") Duration maxEventAge) {
46+
this(mongoCollections, maxEventAge, Clock.systemUTC());
47+
}
48+
49+
@VisibleForTesting
50+
ClusterEventCleanupPeriodical(MongoCollections mongoCollections, Duration maxEventAge, Clock clock) {
4551
this.collection = mongoCollections.collection(COLLECTION_NAME, ClusterEvent.class)
4652
.withWriteConcern(WriteConcern.JOURNALED);
4753
this.maxEventAge = maxEventAge;
54+
this.clock = clock;
4855
}
4956

5057
@Override
@@ -79,7 +86,7 @@ public int getInitialDelaySeconds() {
7986

8087
@Override
8188
public int getPeriodSeconds() {
82-
return Ints.saturatedCast(TimeUnit.DAYS.toSeconds(1L));
89+
return Ints.saturatedCast(Math.max(MIN_PERIOD_SECONDS, maxEventAge.toSeconds()));
8390
}
8491

8592
@Override
@@ -92,7 +99,7 @@ public void doRun() {
9299
try {
93100
LOG.debug("Removing stale events from MongoDB collection \"{}\"", COLLECTION_NAME);
94101

95-
final long timestamp = DateTime.now(DateTimeZone.UTC).getMillis() - maxEventAge.toMillis();
102+
final long timestamp = Instant.now(clock).minus(maxEventAge).toEpochMilli();
96103
final var deleted = collection.deleteMany(Filters.lt("timestamp", timestamp)).getDeletedCount();
97104

98105
LOG.debug("Removed {} stale events from \"{}\"", deleted, COLLECTION_NAME);

graylog2-server/src/main/java/org/graylog2/events/ClusterEventPeriodical.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,10 +79,15 @@ static MongoCollection<ClusterEvent> prepareCollection(final MongoConnection mon
7979
.collection(COLLECTION_NAME, ClusterEvent.class)
8080
.withWriteConcern(WriteConcern.JOURNALED);
8181

82+
try {
83+
collection.dropIndex("timestamp_1_producer_1_consumers_1");
84+
} catch (MongoException ignored) {
85+
// Old index may not exist
86+
}
87+
8288
collection.createIndex(Indexes.ascending(
83-
"timestamp",
84-
"producer",
85-
"consumers"));
89+
"consumers",
90+
"timestamp"));
8691

8792
return collection;
8893
}

graylog2-server/src/test/java/org/graylog2/events/ClusterEventCleanupPeriodicalTest.java

Lines changed: 34 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,26 +25,27 @@
2525
import org.graylog2.database.MongoCollections;
2626
import org.graylog2.database.MongoConnection;
2727
import org.graylog2.shared.bindings.providers.ObjectMapperProvider;
28-
import org.joda.time.DateTime;
29-
import org.joda.time.DateTimeUtils;
30-
import org.joda.time.DateTimeZone;
3128
import org.junit.After;
3229
import org.junit.Before;
3330
import org.junit.Rule;
3431
import org.junit.Test;
3532
import org.mockito.junit.MockitoJUnit;
3633
import org.mockito.junit.MockitoRule;
3734

35+
import java.time.Clock;
3836
import java.time.Duration;
37+
import java.time.Instant;
38+
import java.time.ZoneOffset;
3939
import java.util.Collections;
4040

4141
import static org.assertj.core.api.Assertions.assertThat;
4242

4343
public class ClusterEventCleanupPeriodicalTest {
4444
@Rule
4545
public final MongoDBInstance mongodb = MongoDBInstance.createForClass();
46-
private static final DateTime TIME = new DateTime(2015, 4, 1, 0, 0, DateTimeZone.UTC);
46+
private static final Instant TIME = Instant.parse("2015-04-01T00:00:00Z");
4747
private static final Duration maxEventAge = Duration.ofDays(1);
48+
private static final Clock FIXED_CLOCK = Clock.fixed(TIME, ZoneOffset.UTC);
4849

4950
@Rule
5051
public final MockitoRule mockitoRule = MockitoJUnit.rule();
@@ -54,37 +55,56 @@ public class ClusterEventCleanupPeriodicalTest {
5455
private ClusterEventCleanupPeriodical clusterEventCleanupPeriodical;
5556

5657
@Before
57-
public void setUpService() throws Exception {
58-
DateTimeUtils.setCurrentMillisFixed(TIME.getMillis());
59-
58+
public void setUpService() {
6059
this.mongoConnection = mongodb.mongoConnection();
6160

6261
this.clusterEventCleanupPeriodical = new ClusterEventCleanupPeriodical(new MongoCollections(
6362
new MongoJackObjectMapperProvider(objectMapper),
64-
mongodb.mongoConnection()), maxEventAge);
63+
mongodb.mongoConnection()), maxEventAge, FIXED_CLOCK);
6564
}
6665

6766
@After
6867
public void tearDown() {
69-
DateTimeUtils.setCurrentMillisSystem();
7068
mongoConnection.getMongoDatabase().drop();
7169
}
7270

7371
@Test
74-
public void testDoRun() throws Exception {
75-
final var maxEventAgeMillis = maxEventAge.toMillis();
72+
public void testDoRun() {
73+
final long maxEventAgeMillis = maxEventAge.toMillis();
74+
final long timeMillis = TIME.toEpochMilli();
7675
final DBCollection collection = mongoConnection.getDatabase().getCollection(ClusterEventPeriodical.COLLECTION_NAME);
7776
assertThat(insertEvent(collection, 0L)).isTrue();
78-
assertThat(insertEvent(collection, TIME.getMillis())).isTrue();
79-
assertThat(insertEvent(collection, TIME.minus(maxEventAgeMillis).getMillis())).isTrue();
80-
assertThat(insertEvent(collection, TIME.minus(2 * maxEventAgeMillis).getMillis())).isTrue();
77+
assertThat(insertEvent(collection, timeMillis)).isTrue();
78+
assertThat(insertEvent(collection, timeMillis - maxEventAgeMillis)).isTrue();
79+
assertThat(insertEvent(collection, timeMillis - 2 * maxEventAgeMillis)).isTrue();
8180
assertThat(collection.count()).isEqualTo(4L);
8281

8382
clusterEventCleanupPeriodical.run();
8483

8584
assertThat(collection.count()).isEqualTo(2L);
8685
}
8786

87+
@Test
88+
public void getPeriodSeconds_defaultAge_returns43200() {
89+
final var mongoCollections = new MongoCollections(new MongoJackObjectMapperProvider(objectMapper), mongodb.mongoConnection());
90+
final var periodical = new ClusterEventCleanupPeriodical(mongoCollections, Duration.ofHours(12));
91+
assertThat(periodical.getPeriodSeconds()).isEqualTo(43200);
92+
}
93+
94+
@Test
95+
public void getPeriodSeconds_shortAge_clampsToMinimum() {
96+
final var mongoCollections = new MongoCollections(new MongoJackObjectMapperProvider(objectMapper), mongodb.mongoConnection());
97+
final var periodical = new ClusterEventCleanupPeriodical(mongoCollections, Duration.ofSeconds(10));
98+
assertThat(periodical.getPeriodSeconds()).isEqualTo(3600);
99+
}
100+
101+
@Test
102+
public void getPeriodSeconds_customAge_matchesMaxEventAge() {
103+
final var mongoCollections = new MongoCollections(new MongoJackObjectMapperProvider(objectMapper), mongodb.mongoConnection());
104+
final var periodical = new ClusterEventCleanupPeriodical(mongoCollections, Duration.ofHours(2));
105+
assertThat(periodical.getPeriodSeconds()).isEqualTo(7200);
106+
}
107+
88108
private boolean insertEvent(DBCollection collection, long timestamp) {
89109
DBObject event = new BasicDBObjectBuilder()
90110
.add("timestamp", timestamp)

0 commit comments

Comments
 (0)