Skip to content

Commit f2f0d3f

Browse files
mashhursandsel
andauthored
dead_letter_queue.flush_check_interval new config for flushing staled segment files. (#19036)
* Validates to be min 1s to keep consistency with the docs. Introduces new config for flushing staled segment files. * Add pipeline name to the DLQ flush thread name for better visibility in the threads API results. Add suggestions from the docs review. Re-organize the duration clam logic in a way for better maintainable and fix the unit tests. * Update logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueWriter.java Remove unused method. Co-authored-by: Andrea Selva <selva.andre@gmail.com> * Move the flush chech interval to the DeadLetterQueueWriter.Builder. Remove confusing scheduler from the docs explanations. unit tests for the only newly introduced conditions. * Apply suggestions from code review Doc consistency and test rename suggestions accepted. Co-authored-by: Andrea Selva <selva.andre@gmail.com> * Keep the interval type as a Duration, rename and simplify test suites. --------- Co-authored-by: Andrea Selva <selva.andre@gmail.com>
1 parent c398724 commit f2f0d3f

13 files changed

Lines changed: 209 additions & 88 deletions

config/logstash.yml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,14 @@
279279
#
280280
# dead_letter_queue.flush_interval: 5000
281281

282+
# If using dead_letter_queue.enable: true, the interval in milliseconds that the DLQ scheduler checks for stale segments
283+
# to be flushed. A smaller value ensures faster segment rotation at the cost of CPU with more frequent scheduler runs.
284+
# A larger value reduces scheduler overhead but may delay segment sealing.
285+
# Minimum value is 1000 and cannot be greater than dead_letter_queue.flush_interval.
286+
# Default is 1000.
287+
#
288+
# dead_letter_queue.flush_check_interval: 1000
289+
282290
# If using dead_letter_queue.enable: true, controls which entries should be dropped to avoid exceeding the size limit.
283291
# Set the value to `drop_newer` (default) to stop accepting new events that would push the DLQ size over the limit.
284292
# Set the value to `drop_older` to remove queue pages containing the oldest events to make space for new ones.

docs/reference/dead-letter-queues.md

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,14 @@ Note that this value cannot be set to lower than 1000ms.
7878
dead_letter_queue.flush_interval: 5000
7979
```
8080

81+
Stale segments files are periodically checked if they need to be flushed. This period is controlled by the `dead_letter_queue.flush_check_interval` setting. This setting is in milliseconds, and defaults to 1000ms. A smaller value ensures faster segment rotation when infrequent writes occur, at the cost of CPU consumption with more frequent segment checks execution. A larger value reduces checks overhead but delays segment sealing, for the worst case it will be `dead_letter_queue.flush_interval` + `dead_letter_queue.flush_check_interval`. This value cannot be set to lower than 1000ms.
82+
83+
```yaml
84+
dead_letter_queue.flush_check_interval: 1000
85+
```
86+
8187
::::{note}
82-
You may not use the same `dead_letter_queue` path for two different Logstash instances.
88+
You cannot use the same `dead_letter_queue` path for two different Logstash instances.
8389
::::
8490

8591

logstash-core/lib/logstash/environment.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ def self.as_java_range(r)
103103
Setting::BooleanSetting.new("dead_letter_queue.enable", false),
104104
Setting::BytesSetting.new("dead_letter_queue.max_bytes", "1024mb"),
105105
Setting::NumericSetting.new("dead_letter_queue.flush_interval", 5000),
106+
Setting::NumericSetting.new("dead_letter_queue.flush_check_interval", 1000),
106107
Setting::StringSetting.new("dead_letter_queue.storage_policy", "drop_newer", true, ["drop_newer", "drop_older"]),
107108
Setting::NullableStringSetting.new("dead_letter_queue.retain.age"), # example 5d
108109
Setting::TimeValueSetting.new("slowlog.threshold.warn", "-1"),

logstash-core/lib/logstash/settings.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ def self.included(base)
4646
"config.string",
4747
"dead_letter_queue.enable",
4848
"dead_letter_queue.flush_interval",
49+
"dead_letter_queue.flush_check_interval",
4950
"dead_letter_queue.max_bytes",
5051
"dead_letter_queue.storage_policy",
5152
"dead_letter_queue.retain.age",

logstash-core/src/main/java/org/logstash/common/DeadLetterQueueFactory.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -76,15 +76,16 @@ private DeadLetterQueueFactory() {
7676
* @param maxQueueSize Maximum size of the dead letter queue (in bytes). No entries will be written
7777
* that would make the size of this dlq greater than this value
7878
* @param flushInterval Maximum duration between flushes of dead letter queue files if no data is sent.
79+
* @param flushCheckInterval The interval between scheduler checks for stale segments.
7980
* @param storageType overwriting type in case of queue full: drop_older or drop_newer.
8081
* @return write manager for the specific id's dead-letter-queue context
8182
*/
82-
public static DeadLetterQueueWriter getWriter(String id, String dlqPath, long maxQueueSize, Duration flushInterval, QueueStorageType storageType) {
83-
return REGISTRY.computeIfAbsent(id, key -> newWriter(key, dlqPath, maxQueueSize, flushInterval, storageType));
83+
public static DeadLetterQueueWriter getWriter(String id, String dlqPath, long maxQueueSize, Duration flushInterval, Duration flushCheckInterval, QueueStorageType storageType) {
84+
return REGISTRY.computeIfAbsent(id, key -> newWriter(key, dlqPath, maxQueueSize, flushInterval, flushCheckInterval, storageType));
8485
}
8586

8687
/**
87-
* Like {@link #getWriter(String, String, long, Duration, QueueStorageType)} but also setting the age duration
88+
* Like {@link #getWriter(String, String, long, Duration, Duration, QueueStorageType)} but also setting the age duration
8889
* of the segments.
8990
*
9091
* @param id The identifier context for this dlq manager
@@ -93,23 +94,25 @@ public static DeadLetterQueueWriter getWriter(String id, String dlqPath, long ma
9394
* @param maxQueueSize Maximum size of the dead letter queue (in bytes). No entries will be written
9495
* that would make the size of this dlq greater than this value
9596
* @param flushInterval Maximum duration between flushes of dead letter queue files if no data is sent.
97+
* @param flushCheckInterval The interval between scheduler checks for stale segments.
9698
* @param storageType overwriting type in case of queue full: drop_older or drop_newer.
9799
* @param age the period that DLQ events should be considered as valid, before automatic removal.
98100
* @return write manager for the specific id's dead-letter-queue context
99101
* */
100-
public static DeadLetterQueueWriter getWriter(String id, String dlqPath, long maxQueueSize, Duration flushInterval, QueueStorageType storageType, Duration age) {
101-
return REGISTRY.computeIfAbsent(id, key -> newWriter(key, dlqPath, maxQueueSize, flushInterval, storageType, age));
102+
public static DeadLetterQueueWriter getWriter(String id, String dlqPath, long maxQueueSize, Duration flushInterval, Duration flushCheckInterval, QueueStorageType storageType, Duration age) {
103+
return REGISTRY.computeIfAbsent(id, key -> newWriter(key, dlqPath, maxQueueSize, flushInterval, flushCheckInterval, storageType, age));
102104
}
103105

104106
public static DeadLetterQueueWriter release(String id) {
105107
return REGISTRY.remove(id);
106108
}
107109

108110
private static DeadLetterQueueWriter newWriter(final String id, final String dlqPath, final long maxQueueSize,
109-
final Duration flushInterval, final QueueStorageType storageType) {
111+
final Duration flushInterval, final Duration flushCheckInterval, final QueueStorageType storageType) {
110112
try {
111113
return DeadLetterQueueWriter
112-
.newBuilder(Paths.get(dlqPath, id), MAX_SEGMENT_SIZE_BYTES, maxQueueSize, flushInterval)
114+
.newBuilder(Paths.get(dlqPath, id), MAX_SEGMENT_SIZE_BYTES, maxQueueSize, flushInterval, flushCheckInterval)
115+
.pipelineId(id)
113116
.storageType(storageType)
114117
.build();
115118
} catch (IOException e) {
@@ -119,11 +122,12 @@ private static DeadLetterQueueWriter newWriter(final String id, final String dlq
119122
}
120123

121124
private static DeadLetterQueueWriter newWriter(final String id, final String dlqPath, final long maxQueueSize,
122-
final Duration flushInterval, final QueueStorageType storageType,
125+
final Duration flushInterval, final Duration flushCheckInterval, final QueueStorageType storageType,
123126
final Duration age) {
124127
try {
125128
return DeadLetterQueueWriter
126-
.newBuilder(Paths.get(dlqPath, id), MAX_SEGMENT_SIZE_BYTES, maxQueueSize, flushInterval)
129+
.newBuilder(Paths.get(dlqPath, id), MAX_SEGMENT_SIZE_BYTES, maxQueueSize, flushInterval, flushCheckInterval)
130+
.pipelineId(id)
127131
.storageType(storageType)
128132
.retentionTime(age)
129133
.build();

logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueWriter.java

Lines changed: 62 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,9 @@ public String toString() {
124124
}
125125
}
126126

127+
static final Duration MIN_FLUSH_PERIOD = Duration.ofMillis(1000);
128+
static final Duration MIN_FLUSH_CHECK_INTERVAL = Duration.ofMillis(1000);
129+
127130
@VisibleForTesting
128131
static final String SEGMENT_FILE_PATTERN = "%d.log";
129132
private static final Logger logger = LogManager.getLogger(DeadLetterQueueWriter.class);
@@ -174,21 +177,25 @@ interface SchedulerService {
174177
private static class FixedRateScheduler implements SchedulerService {
175178

176179
private final ScheduledExecutorService scheduledExecutor;
180+
private final Duration flushCheckInterval;
177181

178-
FixedRateScheduler() {
182+
FixedRateScheduler(final Duration flushCheckInterval, final String pipelineId) {
183+
//Set the name with pipeline ID for better visibility
184+
final String threadName = pipelineId != null ? "dlq-flush-check[" + pipelineId + "]" : "dlq-flush-check";
185+
186+
this.flushCheckInterval = flushCheckInterval;
179187
scheduledExecutor = Executors.newScheduledThreadPool(1, r -> {
180188
Thread t = new Thread(r);
181189
//Allow this thread to die when the JVM dies
182190
t.setDaemon(true);
183-
//Set the name
184-
t.setName("dlq-flush-check");
191+
t.setName(threadName);
185192
return t;
186193
});
187194
}
188195

189196
@Override
190197
public void repeatedAction(Runnable action) {
191-
scheduledExecutor.scheduleAtFixedRate(action, 1L, 1L, TimeUnit.SECONDS);
198+
scheduledExecutor.scheduleAtFixedRate(action, flushCheckInterval.toMillis(), flushCheckInterval.toMillis(), TimeUnit.MILLISECONDS);
192199
}
193200

194201
@Override
@@ -215,21 +222,24 @@ public static final class Builder {
215222
private final long maxSegmentSize;
216223
private final long maxQueueSize;
217224
private final Duration flushInterval;
225+
private final Duration flushCheckInterval;
218226
private boolean startScheduledFlusher;
219227
private QueueStorageType storageType = QueueStorageType.DROP_NEWER;
220228
private Duration retentionTime = null;
221229
private Clock clock = Clock.systemDefaultZone();
222230
private SchedulerService customSchedulerService = null;
231+
private String pipelineId;
223232

224-
private Builder(Path queuePath, long maxSegmentSize, long maxQueueSize, Duration flushInterval) {
225-
this(queuePath, maxSegmentSize, maxQueueSize, flushInterval, true);
233+
private Builder(Path queuePath, long maxSegmentSize, long maxQueueSize, final Duration flushInterval, final Duration flushCheckInterval) {
234+
this(queuePath, maxSegmentSize, maxQueueSize, flushInterval, flushCheckInterval, true);
226235
}
227236

228-
private Builder(Path queuePath, long maxSegmentSize, long maxQueueSize, Duration flushInterval, boolean startScheduledFlusher) {
237+
private Builder(Path queuePath, long maxSegmentSize, long maxQueueSize, final Duration flushInterval, final Duration flushCheckInterval, boolean startScheduledFlusher) {
229238
this.queuePath = queuePath;
230239
this.maxSegmentSize = maxSegmentSize;
231240
this.maxQueueSize = maxQueueSize;
232241
this.flushInterval = flushInterval;
242+
this.flushCheckInterval = flushCheckInterval;
233243
this.startScheduledFlusher = startScheduledFlusher;
234244
}
235245

@@ -243,6 +253,11 @@ public Builder retentionTime(Duration retentionTime) {
243253
return this;
244254
}
245255

256+
public Builder pipelineId(final String pipelineId) {
257+
this.pipelineId = pipelineId;
258+
return this;
259+
}
260+
246261
@VisibleForTesting
247262
Builder clock(Clock clock) {
248263
this.clock = clock;
@@ -259,29 +274,64 @@ public DeadLetterQueueWriter build() throws IOException {
259274
if (customSchedulerService != null && startScheduledFlusher) {
260275
throw new IllegalArgumentException("Both default scheduler and custom scheduler were defined, ");
261276
}
277+
278+
final Duration normalizedFlushInterval = normalizeFlushInterval(flushInterval);
279+
262280
SchedulerService schedulerService;
263281
if (customSchedulerService != null) {
264282
schedulerService = customSchedulerService;
265283
} else {
266284
if (startScheduledFlusher) {
267-
schedulerService = new FixedRateScheduler();
285+
final Duration normalizedFlushCheckInterval = normalizeFlushCheckInterval(flushCheckInterval, normalizedFlushInterval);
286+
schedulerService = new FixedRateScheduler(normalizedFlushCheckInterval, pipelineId);
268287
} else {
269288
schedulerService = new NoopScheduler();
270289
}
271290
}
272291

273-
return new DeadLetterQueueWriter(queuePath, maxSegmentSize, maxQueueSize, flushInterval, storageType, retentionTime, clock, schedulerService);
292+
return new DeadLetterQueueWriter(queuePath, maxSegmentSize, maxQueueSize, normalizedFlushInterval, storageType, retentionTime, clock, schedulerService);
293+
}
294+
295+
@VisibleForTesting
296+
Duration normalizeFlushInterval(final Duration flushInterval) {
297+
if (!startScheduledFlusher) return flushInterval;
298+
299+
Duration effectiveFlushInterval = flushInterval;
300+
if (flushInterval.compareTo(MIN_FLUSH_PERIOD) < 0) {
301+
logger.warn("dead_letter_queue.flush_interval ({} ms) is below the minimum of {} ms; using {} ms",
302+
flushInterval.toMillis(), MIN_FLUSH_PERIOD.toMillis(), MIN_FLUSH_PERIOD.toMillis());
303+
effectiveFlushInterval = MIN_FLUSH_PERIOD;
304+
}
305+
return effectiveFlushInterval;
306+
}
307+
308+
@VisibleForTesting
309+
Duration normalizeFlushCheckInterval(final Duration flushCheckInterval, final Duration effectiveFlushInterval) {
310+
Duration effectiveFlushCheckInterval = flushCheckInterval;
311+
// can't exceed flush interval
312+
if (effectiveFlushCheckInterval.compareTo(effectiveFlushInterval) > 0) {
313+
logger.warn("dead_letter_queue.flush_check_interval ({} ms) cannot be greater than dead_letter_queue.flush_interval ({} ms); using {} ms",
314+
effectiveFlushCheckInterval.toMillis(), effectiveFlushInterval.toMillis(), effectiveFlushInterval.toMillis());
315+
effectiveFlushCheckInterval = effectiveFlushInterval;
316+
}
317+
// can't be less than 1s
318+
if (effectiveFlushCheckInterval.compareTo(MIN_FLUSH_CHECK_INTERVAL) < 0) {
319+
logger.warn("dead_letter_queue.flush_check_interval ({} ms) is below the minimum of {} ms; using {} ms for the flush check interval",
320+
effectiveFlushCheckInterval.toMillis(), MIN_FLUSH_CHECK_INTERVAL.toMillis(), MIN_FLUSH_CHECK_INTERVAL.toMillis());
321+
effectiveFlushCheckInterval = MIN_FLUSH_CHECK_INTERVAL;
322+
}
323+
return effectiveFlushCheckInterval;
274324
}
275325
}
276326

277327
public static Builder newBuilder(final Path queuePath, final long maxSegmentSize, final long maxQueueSize,
278-
final Duration flushInterval) {
279-
return new Builder(queuePath, maxSegmentSize, maxQueueSize, flushInterval);
328+
final Duration flushInterval, final Duration flushCheckInterval) {
329+
return new Builder(queuePath, maxSegmentSize, maxQueueSize, flushInterval, flushCheckInterval);
280330
}
281331

282332
@VisibleForTesting
283333
static Builder newBuilderWithoutFlusher(final Path queuePath, final long maxSegmentSize, final long maxQueueSize) {
284-
return new Builder(queuePath, maxSegmentSize, maxQueueSize, Duration.ZERO, false);
334+
return new Builder(queuePath, maxSegmentSize, maxQueueSize, Duration.ZERO, Duration.ZERO, false);
285335
}
286336

287337
private DeadLetterQueueWriter(final Path queuePath, final long maxSegmentSize, final long maxQueueSize,

logstash-core/src/main/java/org/logstash/common/io/RecordIOWriter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ public boolean hasWritten(){
173173
return lastWrite != null;
174174
}
175175

176-
public boolean isStale(Duration flushPeriod){
176+
public boolean isStale(final Duration flushPeriod) {
177177
return hasWritten() && Instant.now().minus(flushPeriod).isAfter(lastWrite);
178178
}
179179

logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -466,15 +466,16 @@ private DeadLetterQueueWriter createDeadLetterQueueWriterFromSettings(ThreadCont
466466
String dlqPath = getSetting(context, "path.dead_letter_queue").asJavaString();
467467
long dlqMaxBytes = org.jruby.RubyNumeric.num2long(getSetting(context, "dead_letter_queue.max_bytes").convertToInteger());
468468
Duration dlqFlushInterval = Duration.ofMillis(org.jruby.RubyNumeric.num2long(getSetting(context, "dead_letter_queue.flush_interval").convertToInteger()));
469+
Duration dlqFlushCheckInterval = Duration.ofMillis(org.jruby.RubyNumeric.num2long(getSetting(context, "dead_letter_queue.flush_check_interval").convertToInteger()));
469470

470471
if (hasSetting(context, "dead_letter_queue.retain.age") && !getSetting(context, "dead_letter_queue.retain.age").isNil()) {
471472
// convert to Duration
472473
final Duration age = parseToDuration(getSetting(context, "dead_letter_queue.retain.age").convertToString().toString());
473474
return DeadLetterQueueFactory.getWriter(pipelineId.asJavaString(), dlqPath, dlqMaxBytes,
474-
dlqFlushInterval, storageType, age);
475+
dlqFlushInterval, dlqFlushCheckInterval, storageType, age);
475476
}
476477

477-
return DeadLetterQueueFactory.getWriter(pipelineId.asJavaString(), dlqPath, dlqMaxBytes, dlqFlushInterval, storageType);
478+
return DeadLetterQueueFactory.getWriter(pipelineId.asJavaString(), dlqPath, dlqMaxBytes, dlqFlushInterval, dlqFlushCheckInterval, storageType);
478479
}
479480

480481
/**

logstash-core/src/test/java/org/logstash/common/AbstractDeadLetterQueueWriterExtTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ private void writeAnEventIntoDLQ(Path dlqPath, String pluginId, String pluginTyp
6060
RubyString id = RubyString.newString(RubyUtil.RUBY, pluginId);
6161
RubyString classConfigName = RubyString.newString(RubyUtil.RUBY, pluginType);
6262

63-
final DeadLetterQueueWriter javaDlqWriter = DeadLetterQueueFactory.getWriter(dlqName, dlqPath.toString(), 1024 * 1024, Duration.ofHours(1), QueueStorageType.DROP_NEWER);
63+
final DeadLetterQueueWriter javaDlqWriter = DeadLetterQueueFactory.getWriter(dlqName, dlqPath.toString(), 1024 * 1024, Duration.ofHours(1), Duration.ofSeconds(1), QueueStorageType.DROP_NEWER);
6464
IRubyObject dlqWriter = JavaUtil.convertJavaToUsableRubyObject(context.runtime, javaDlqWriter);
6565

6666
final AbstractDeadLetterQueueWriterExt dlqWriterForInstance = new AbstractDeadLetterQueueWriterExt.PluginDeadLetterQueueWriterExt(

0 commit comments

Comments
 (0)