From 355f18f1c76e15b9731d2dc9565ee91034cdb902 Mon Sep 17 00:00:00 2001 From: Mashhur Date: Tue, 21 Apr 2026 16:19:33 -0700 Subject: [PATCH 1/6] Validates to be min 1s to keep consistency with the docs. Introduces new config for flushing staled segment files. --- config/logstash.yml | 8 ++++ docs/reference/dead-letter-queues.md | 6 +++ logstash-core/lib/logstash/environment.rb | 1 + logstash-core/lib/logstash/settings.rb | 1 + .../common/DeadLetterQueueFactory.java | 18 ++++--- .../common/io/DeadLetterQueueWriter.java | 48 +++++++++++++++++-- .../logstash/common/io/RecordIOWriter.java | 2 +- .../execution/AbstractPipelineExt.java | 5 +- .../AbstractDeadLetterQueueWriterExtTest.java | 2 +- .../common/DeadLetterQueueFactoryTest.java | 8 ++-- .../common/io/DeadLetterQueueWriterTest.java | 45 +++++++++++++++++ 11 files changed, 124 insertions(+), 20 deletions(-) diff --git a/config/logstash.yml b/config/logstash.yml index d2903c47fd..05615b9402 100644 --- a/config/logstash.yml +++ b/config/logstash.yml @@ -271,6 +271,14 @@ # # dead_letter_queue.flush_interval: 5000 +# If using dead_letter_queue.enable: true, the interval in milliseconds that the DLQ scheduler checks for stale segments +# to be flushed. A smaller value ensures faster segment rotation at the cost of CPU with more frequent scheduler runs. +# A larger value reduces scheduler overhead but may delay segment sealing. +# Minimum value is 1000 and cannot be greater than dead_letter_queue.flush_interval. +# Default is 1000. +# +# dead_letter_queue.flush_check_interval: 1000 + # If using dead_letter_queue.enable: true, controls which entries should be dropped to avoid exceeding the size limit. # Set the value to `drop_newer` (default) to stop accepting new events that would push the DLQ size over the limit. # Set the value to `drop_older` to remove queue pages containing the oldest events to make space for new ones. diff --git a/docs/reference/dead-letter-queues.md b/docs/reference/dead-letter-queues.md index c1e35afb65..924a54cc4a 100644 --- a/docs/reference/dead-letter-queues.md +++ b/docs/reference/dead-letter-queues.md @@ -78,6 +78,12 @@ Note that this value cannot be set to lower than 1000ms. dead_letter_queue.flush_interval: 5000 ``` +Dead letter queue scheduler checks are controlled by the `dead_letter_queue.flush_check_interval` setting. This setting controls how frequently the background scheduler checks for stale segments that need to be flushed. This setting is in milliseconds, and defaults to 1000ms. A smaller value ensures faster segment rotation when infrequent writes occur, at the cost of CPU consumption with more frequent scheduler execution. A larger value reduces scheduler overhead but may delay segment sealing, for the worst case it will be `dead_letter_queue.flush_interval` + `dead_letter_queue.flush_check_interval`. This value cannot be set to lower than 1000ms. + +```yaml +dead_letter_queue.flush_check_interval: 1000 +``` + ::::{note} You may not use the same `dead_letter_queue` path for two different Logstash instances. :::: diff --git a/logstash-core/lib/logstash/environment.rb b/logstash-core/lib/logstash/environment.rb index 5ceeca979f..c5bf204035 100644 --- a/logstash-core/lib/logstash/environment.rb +++ b/logstash-core/lib/logstash/environment.rb @@ -102,6 +102,7 @@ def self.as_java_range(r) Setting::BooleanSetting.new("dead_letter_queue.enable", false), Setting::BytesSetting.new("dead_letter_queue.max_bytes", "1024mb"), Setting::NumericSetting.new("dead_letter_queue.flush_interval", 5000), + Setting::NumericSetting.new("dead_letter_queue.flush_check_interval", 1000), Setting::StringSetting.new("dead_letter_queue.storage_policy", "drop_newer", true, ["drop_newer", "drop_older"]), Setting::NullableStringSetting.new("dead_letter_queue.retain.age"), # example 5d Setting::TimeValueSetting.new("slowlog.threshold.warn", "-1"), diff --git a/logstash-core/lib/logstash/settings.rb b/logstash-core/lib/logstash/settings.rb index e706792027..5e7021cde9 100644 --- a/logstash-core/lib/logstash/settings.rb +++ b/logstash-core/lib/logstash/settings.rb @@ -46,6 +46,7 @@ def self.included(base) "config.string", "dead_letter_queue.enable", "dead_letter_queue.flush_interval", + "dead_letter_queue.flush_check_interval", "dead_letter_queue.max_bytes", "dead_letter_queue.storage_policy", "dead_letter_queue.retain.age", diff --git a/logstash-core/src/main/java/org/logstash/common/DeadLetterQueueFactory.java b/logstash-core/src/main/java/org/logstash/common/DeadLetterQueueFactory.java index 16fbc0c5b2..84e2e77af0 100644 --- a/logstash-core/src/main/java/org/logstash/common/DeadLetterQueueFactory.java +++ b/logstash-core/src/main/java/org/logstash/common/DeadLetterQueueFactory.java @@ -76,15 +76,16 @@ private DeadLetterQueueFactory() { * @param maxQueueSize Maximum size of the dead letter queue (in bytes). No entries will be written * that would make the size of this dlq greater than this value * @param flushInterval Maximum duration between flushes of dead letter queue files if no data is sent. + * @param flushCheckInterval The interval between scheduler checks for stale segments. * @param storageType overwriting type in case of queue full: drop_older or drop_newer. * @return write manager for the specific id's dead-letter-queue context */ - public static DeadLetterQueueWriter getWriter(String id, String dlqPath, long maxQueueSize, Duration flushInterval, QueueStorageType storageType) { - return REGISTRY.computeIfAbsent(id, key -> newWriter(key, dlqPath, maxQueueSize, flushInterval, storageType)); + public static DeadLetterQueueWriter getWriter(String id, String dlqPath, long maxQueueSize, Duration flushInterval, Duration flushCheckInterval, QueueStorageType storageType) { + return REGISTRY.computeIfAbsent(id, key -> newWriter(key, dlqPath, maxQueueSize, flushInterval, flushCheckInterval, storageType)); } /** - * Like {@link #getWriter(String, String, long, Duration, QueueStorageType)} but also setting the age duration + * Like {@link #getWriter(String, String, long, Duration, Duration, QueueStorageType)} but also setting the age duration * of the segments. * * @param id The identifier context for this dlq manager @@ -93,12 +94,13 @@ public static DeadLetterQueueWriter getWriter(String id, String dlqPath, long ma * @param maxQueueSize Maximum size of the dead letter queue (in bytes). No entries will be written * that would make the size of this dlq greater than this value * @param flushInterval Maximum duration between flushes of dead letter queue files if no data is sent. + * @param flushCheckInterval The interval between scheduler checks for stale segments. * @param storageType overwriting type in case of queue full: drop_older or drop_newer. * @param age the period that DLQ events should be considered as valid, before automatic removal. * @return write manager for the specific id's dead-letter-queue context * */ - public static DeadLetterQueueWriter getWriter(String id, String dlqPath, long maxQueueSize, Duration flushInterval, QueueStorageType storageType, Duration age) { - return REGISTRY.computeIfAbsent(id, key -> newWriter(key, dlqPath, maxQueueSize, flushInterval, storageType, age)); + public static DeadLetterQueueWriter getWriter(String id, String dlqPath, long maxQueueSize, Duration flushInterval, Duration flushCheckInterval, QueueStorageType storageType, Duration age) { + return REGISTRY.computeIfAbsent(id, key -> newWriter(key, dlqPath, maxQueueSize, flushInterval, flushCheckInterval, storageType, age)); } public static DeadLetterQueueWriter release(String id) { @@ -106,10 +108,11 @@ public static DeadLetterQueueWriter release(String id) { } private static DeadLetterQueueWriter newWriter(final String id, final String dlqPath, final long maxQueueSize, - final Duration flushInterval, final QueueStorageType storageType) { + final Duration flushInterval, final Duration flushCheckInterval, final QueueStorageType storageType) { try { return DeadLetterQueueWriter .newBuilder(Paths.get(dlqPath, id), MAX_SEGMENT_SIZE_BYTES, maxQueueSize, flushInterval) + .flushCheckInterval(flushCheckInterval) .storageType(storageType) .build(); } catch (IOException e) { @@ -119,11 +122,12 @@ private static DeadLetterQueueWriter newWriter(final String id, final String dlq } private static DeadLetterQueueWriter newWriter(final String id, final String dlqPath, final long maxQueueSize, - final Duration flushInterval, final QueueStorageType storageType, + final Duration flushInterval, final Duration flushCheckInterval, final QueueStorageType storageType, final Duration age) { try { return DeadLetterQueueWriter .newBuilder(Paths.get(dlqPath, id), MAX_SEGMENT_SIZE_BYTES, maxQueueSize, flushInterval) + .flushCheckInterval(flushCheckInterval) .storageType(storageType) .retentionTime(age) .build(); diff --git a/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueWriter.java b/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueWriter.java index 23c4e7e8f2..629cb39462 100644 --- a/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueWriter.java +++ b/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueWriter.java @@ -124,6 +124,8 @@ public String toString() { } } + static final Duration MIN_FLUSH_INTERVAL = Duration.ofMillis(1000); + @VisibleForTesting static final String SEGMENT_FILE_PATTERN = "%d.log"; private static final Logger logger = LogManager.getLogger(DeadLetterQueueWriter.class); @@ -142,6 +144,7 @@ public String toString() { private volatile int currentSegmentIndex; private volatile Timestamp lastEntryTimestamp; private final Duration flushInterval; + private final Duration flushCheckInterval; private final AtomicBoolean open = new AtomicBoolean(true); private final LongAdder droppedEvents = new LongAdder(); private final LongAdder expiredEvents = new LongAdder(); @@ -173,9 +176,12 @@ interface SchedulerService { private static class FixedRateScheduler implements SchedulerService { + private static final long MIN_CHECK_INTERVAL_MS = 1000L; private final ScheduledExecutorService scheduledExecutor; + private final long checkIntervalMs; - FixedRateScheduler() { + FixedRateScheduler(final Duration flushCheckInterval) { + this.checkIntervalMs = Math.max(flushCheckInterval.toMillis(), MIN_CHECK_INTERVAL_MS); scheduledExecutor = Executors.newScheduledThreadPool(1, r -> { Thread t = new Thread(r); //Allow this thread to die when the JVM dies @@ -188,7 +194,7 @@ private static class FixedRateScheduler implements SchedulerService { @Override public void repeatedAction(Runnable action) { - scheduledExecutor.scheduleAtFixedRate(action, 1L, 1L, TimeUnit.SECONDS); + scheduledExecutor.scheduleAtFixedRate(action, checkIntervalMs, checkIntervalMs, TimeUnit.MILLISECONDS); } @Override @@ -218,6 +224,7 @@ public static final class Builder { private boolean startScheduledFlusher; private QueueStorageType storageType = QueueStorageType.DROP_NEWER; private Duration retentionTime = null; + private Duration flushCheckInterval = Duration.ofMillis(1000); private Clock clock = Clock.systemDefaultZone(); private SchedulerService customSchedulerService = null; @@ -243,6 +250,11 @@ public Builder retentionTime(Duration retentionTime) { return this; } + public Builder flushCheckInterval(final Duration interval) { + this.flushCheckInterval = interval; + return this; + } + @VisibleForTesting Builder clock(Clock clock) { this.clock = clock; @@ -259,18 +271,33 @@ public DeadLetterQueueWriter build() throws IOException { if (customSchedulerService != null && startScheduledFlusher) { throw new IllegalArgumentException("Both default scheduler and custom scheduler were defined, "); } + + Duration effectiveFlushInterval = flushInterval; + if (startScheduledFlusher && flushInterval.compareTo(MIN_FLUSH_INTERVAL) < 0) { + logger.warn("dead_letter_queue.flush_interval ({} ms) is below the minimum of {} ms; using {} ms", + flushInterval.toMillis(), MIN_FLUSH_INTERVAL.toMillis(), MIN_FLUSH_INTERVAL.toMillis()); + effectiveFlushInterval = MIN_FLUSH_INTERVAL; + } + + Duration effectiveFlushCheckInterval = flushCheckInterval; + if (startScheduledFlusher && flushCheckInterval.compareTo(effectiveFlushInterval) > 0) { + logger.warn("dead_letter_queue.flush_check_interval ({} ms) cannot be greater than dead_letter_queue.flush_interval ({} ms); using {} ms", + flushCheckInterval.toMillis(), effectiveFlushInterval.toMillis(), effectiveFlushInterval.toMillis()); + effectiveFlushCheckInterval = effectiveFlushInterval; + } + SchedulerService schedulerService; if (customSchedulerService != null) { schedulerService = customSchedulerService; } else { if (startScheduledFlusher) { - schedulerService = new FixedRateScheduler(); + schedulerService = new FixedRateScheduler(effectiveFlushCheckInterval); } else { schedulerService = new NoopScheduler(); } } - return new DeadLetterQueueWriter(queuePath, maxSegmentSize, maxQueueSize, flushInterval, storageType, retentionTime, clock, schedulerService); + return new DeadLetterQueueWriter(queuePath, maxSegmentSize, maxQueueSize, effectiveFlushInterval, effectiveFlushCheckInterval, storageType, retentionTime, clock, schedulerService); } } @@ -285,7 +312,7 @@ static Builder newBuilderWithoutFlusher(final Path queuePath, final long maxSegm } private DeadLetterQueueWriter(final Path queuePath, final long maxSegmentSize, final long maxQueueSize, - final Duration flushInterval, final QueueStorageType storageType, final Duration retentionTime, + final Duration flushInterval, final Duration flushCheckInterval, final QueueStorageType storageType, final Duration retentionTime, final Clock clock, SchedulerService flusherService) throws IOException { this.clock = clock; @@ -295,6 +322,7 @@ private DeadLetterQueueWriter(final Path queuePath, final long maxSegmentSize, f this.maxQueueSize = maxQueueSize; this.storageType = storageType; this.flushInterval = flushInterval; + this.flushCheckInterval = flushCheckInterval; this.currentQueueSize = new AtomicLong(computeQueueSize()); this.retentionTime = retentionTime; @@ -372,6 +400,11 @@ public long getCurrentQueueSize() { return currentQueueSize.longValue(); } + @VisibleForTesting + public Duration flushCheckInterval() { + return flushCheckInterval; + } + public String getStoragePolicy() { return storageType.name().toLowerCase(Locale.ROOT); } @@ -647,6 +680,11 @@ private static boolean alreadyProcessed(final Event event) { return event.includes(DEAD_LETTER_QUEUE_METADATA_KEY); } + @VisibleForTesting + public void runScheduledFlushCheck() { + scheduledFlushCheck(); + } + // main method for flush scheduler private void scheduledFlushCheck() { logger.trace("Running scheduled check"); diff --git a/logstash-core/src/main/java/org/logstash/common/io/RecordIOWriter.java b/logstash-core/src/main/java/org/logstash/common/io/RecordIOWriter.java index 34d3b5024f..d529e97e34 100644 --- a/logstash-core/src/main/java/org/logstash/common/io/RecordIOWriter.java +++ b/logstash-core/src/main/java/org/logstash/common/io/RecordIOWriter.java @@ -173,7 +173,7 @@ public boolean hasWritten(){ return lastWrite != null; } - public boolean isStale(Duration flushPeriod){ + public boolean isStale(final Duration flushPeriod) { return hasWritten() && Instant.now().minus(flushPeriod).isAfter(lastWrite); } diff --git a/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java b/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java index 1ad9d75117..35d1e8748d 100644 --- a/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java +++ b/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java @@ -466,15 +466,16 @@ private DeadLetterQueueWriter createDeadLetterQueueWriterFromSettings(ThreadCont String dlqPath = getSetting(context, "path.dead_letter_queue").asJavaString(); long dlqMaxBytes = org.jruby.RubyNumeric.num2long(getSetting(context, "dead_letter_queue.max_bytes").convertToInteger()); Duration dlqFlushInterval = Duration.ofMillis(org.jruby.RubyNumeric.num2long(getSetting(context, "dead_letter_queue.flush_interval").convertToInteger())); + Duration dlqFlushCheckInterval = Duration.ofMillis(org.jruby.RubyNumeric.num2long(getSetting(context, "dead_letter_queue.flush_check_interval").convertToInteger())); if (hasSetting(context, "dead_letter_queue.retain.age") && !getSetting(context, "dead_letter_queue.retain.age").isNil()) { // convert to Duration final Duration age = parseToDuration(getSetting(context, "dead_letter_queue.retain.age").convertToString().toString()); return DeadLetterQueueFactory.getWriter(pipelineId.asJavaString(), dlqPath, dlqMaxBytes, - dlqFlushInterval, storageType, age); + dlqFlushInterval, dlqFlushCheckInterval, storageType, age); } - return DeadLetterQueueFactory.getWriter(pipelineId.asJavaString(), dlqPath, dlqMaxBytes, dlqFlushInterval, storageType); + return DeadLetterQueueFactory.getWriter(pipelineId.asJavaString(), dlqPath, dlqMaxBytes, dlqFlushInterval, dlqFlushCheckInterval, storageType); } /** diff --git a/logstash-core/src/test/java/org/logstash/common/AbstractDeadLetterQueueWriterExtTest.java b/logstash-core/src/test/java/org/logstash/common/AbstractDeadLetterQueueWriterExtTest.java index 335dec971f..8e87bf7eb1 100644 --- a/logstash-core/src/test/java/org/logstash/common/AbstractDeadLetterQueueWriterExtTest.java +++ b/logstash-core/src/test/java/org/logstash/common/AbstractDeadLetterQueueWriterExtTest.java @@ -60,7 +60,7 @@ private void writeAnEventIntoDLQ(Path dlqPath, String pluginId, String pluginTyp RubyString id = RubyString.newString(RubyUtil.RUBY, pluginId); RubyString classConfigName = RubyString.newString(RubyUtil.RUBY, pluginType); - final DeadLetterQueueWriter javaDlqWriter = DeadLetterQueueFactory.getWriter(dlqName, dlqPath.toString(), 1024 * 1024, Duration.ofHours(1), QueueStorageType.DROP_NEWER); + final DeadLetterQueueWriter javaDlqWriter = DeadLetterQueueFactory.getWriter(dlqName, dlqPath.toString(), 1024 * 1024, Duration.ofHours(1), Duration.ofSeconds(1), QueueStorageType.DROP_NEWER); IRubyObject dlqWriter = JavaUtil.convertJavaToUsableRubyObject(context.runtime, javaDlqWriter); final AbstractDeadLetterQueueWriterExt dlqWriterForInstance = new AbstractDeadLetterQueueWriterExt.PluginDeadLetterQueueWriterExt( diff --git a/logstash-core/src/test/java/org/logstash/common/DeadLetterQueueFactoryTest.java b/logstash-core/src/test/java/org/logstash/common/DeadLetterQueueFactoryTest.java index cf85824d7a..6802d97849 100644 --- a/logstash-core/src/test/java/org/logstash/common/DeadLetterQueueFactoryTest.java +++ b/logstash-core/src/test/java/org/logstash/common/DeadLetterQueueFactoryTest.java @@ -69,9 +69,9 @@ public void setUp() throws Exception { public void testSameBeforeRelease() throws IOException { try { Path pipelineA = dir.resolve(PIPELINE_NAME); - DeadLetterQueueWriter writer = DeadLetterQueueFactory.getWriter(PIPELINE_NAME, pipelineA.toString(), 10000, Duration.ofSeconds(1), QueueStorageType.DROP_NEWER); + DeadLetterQueueWriter writer = DeadLetterQueueFactory.getWriter(PIPELINE_NAME, pipelineA.toString(), 10000, Duration.ofSeconds(1), Duration.ofSeconds(1), QueueStorageType.DROP_NEWER); assertTrue(writer.isOpen()); - DeadLetterQueueWriter writer2 = DeadLetterQueueFactory.getWriter(PIPELINE_NAME, pipelineA.toString(), 10000, Duration.ofSeconds(1), QueueStorageType.DROP_NEWER); + DeadLetterQueueWriter writer2 = DeadLetterQueueFactory.getWriter(PIPELINE_NAME, pipelineA.toString(), 10000, Duration.ofSeconds(1), Duration.ofSeconds(1), QueueStorageType.DROP_NEWER); assertSame(writer, writer2); writer.close(); } finally { @@ -83,11 +83,11 @@ public void testSameBeforeRelease() throws IOException { public void testOpenableAfterRelease() throws IOException { try { Path pipelineA = dir.resolve(PIPELINE_NAME); - DeadLetterQueueWriter writer = DeadLetterQueueFactory.getWriter(PIPELINE_NAME, pipelineA.toString(), 10000, Duration.ofSeconds(1), QueueStorageType.DROP_NEWER); + DeadLetterQueueWriter writer = DeadLetterQueueFactory.getWriter(PIPELINE_NAME, pipelineA.toString(), 10000, Duration.ofSeconds(1), Duration.ofSeconds(1), QueueStorageType.DROP_NEWER); assertTrue(writer.isOpen()); writer.close(); DeadLetterQueueFactory.release(PIPELINE_NAME); - writer = DeadLetterQueueFactory.getWriter(PIPELINE_NAME, pipelineA.toString(), 10000, Duration.ofSeconds(1), QueueStorageType.DROP_NEWER); + writer = DeadLetterQueueFactory.getWriter(PIPELINE_NAME, pipelineA.toString(), 10000, Duration.ofSeconds(1), Duration.ofSeconds(1), QueueStorageType.DROP_NEWER); assertTrue(writer.isOpen()); writer.close(); }finally{ diff --git a/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueWriterTest.java b/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueWriterTest.java index 1e507bcdfe..c2ac7af6f9 100644 --- a/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueWriterTest.java +++ b/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueWriterTest.java @@ -616,4 +616,49 @@ private long countDlqSegments(Path dir) throws IOException { } } + @Test + public void testFlushCheckIntervalIsRespected() throws IOException { + final Duration flushCheckInterval = Duration.ofMillis(500); + DeadLetterQueueWriter writer = DeadLetterQueueWriter + .newBuilder(dir, 1_000, 100_000, Duration.ofSeconds(5)) + .flushCheckInterval(flushCheckInterval) + .build(); + + assertEquals("flushCheckInterval should be applied to the writer", flushCheckInterval.toMillis(), + writer.flushCheckInterval().toMillis()); + + writer.close(); + } + + @Test + public void testFlushCheckIntervalClampedToMinimum() throws IOException { + final Duration belowMinimum = Duration.ofMillis(500); + DeadLetterQueueWriter writer = DeadLetterQueueWriter + .newBuilder(dir, 1_000, 100_000, Duration.ofSeconds(5)) + .flushCheckInterval(belowMinimum) + .build(); + + // The writer should enforce a minimum of 1000ms in the scheduler + assertTrue("flushCheckInterval below 1000ms should be clamped to 1000ms", + writer.flushCheckInterval().toMillis() >= 1000); + + writer.close(); + } + + @Test + public void testFlushCheckIntervalClampedToFlushInterval() throws IOException { + final Duration flushInterval = Duration.ofSeconds(3); + final Duration aboveFlushInterval = Duration.ofSeconds(5); + DeadLetterQueueWriter writer = DeadLetterQueueWriter + .newBuilder(dir, 1_000, 100_000, flushInterval) + .flushCheckInterval(aboveFlushInterval) + .build(); + + // The writer should clamp flushCheckInterval to not exceed flushInterval + assertTrue("flushCheckInterval should not exceed flushInterval", + writer.flushCheckInterval().compareTo(flushInterval) <= 0); + + writer.close(); + } + } From ab4f453468412359560da5981044394095841340 Mon Sep 17 00:00:00 2001 From: Mashhur Date: Wed, 22 Apr 2026 13:58:42 -0700 Subject: [PATCH 2/6] Add pipeline name to the DLQ flush thread name for better visibility in the threads API results. Add suggestions from the docs review. Re-organize the duration clam logic in a way for better maintainable and fix the unit tests. --- docs/reference/dead-letter-queues.md | 4 +- .../common/DeadLetterQueueFactory.java | 2 + .../common/io/DeadLetterQueueWriter.java | 45 +++++++++++++------ .../common/io/DeadLetterQueueWriterTest.java | 6 +-- 4 files changed, 38 insertions(+), 19 deletions(-) diff --git a/docs/reference/dead-letter-queues.md b/docs/reference/dead-letter-queues.md index 924a54cc4a..71bc18c37d 100644 --- a/docs/reference/dead-letter-queues.md +++ b/docs/reference/dead-letter-queues.md @@ -78,14 +78,14 @@ Note that this value cannot be set to lower than 1000ms. dead_letter_queue.flush_interval: 5000 ``` -Dead letter queue scheduler checks are controlled by the `dead_letter_queue.flush_check_interval` setting. This setting controls how frequently the background scheduler checks for stale segments that need to be flushed. This setting is in milliseconds, and defaults to 1000ms. A smaller value ensures faster segment rotation when infrequent writes occur, at the cost of CPU consumption with more frequent scheduler execution. A larger value reduces scheduler overhead but may delay segment sealing, for the worst case it will be `dead_letter_queue.flush_interval` + `dead_letter_queue.flush_check_interval`. This value cannot be set to lower than 1000ms. +Dead letter queue scheduler checks are controlled by the `dead_letter_queue.flush_check_interval` setting. This setting controls how frequently the background scheduler checks for stale segments that need to be flushed. This setting is in milliseconds, and defaults to 1000ms. A smaller value ensures faster segment rotation when infrequent writes occur, at the cost of CPU consumption with more frequent scheduler execution. A larger value reduces scheduler overhead but delays segment sealing, for the worst case it will be `dead_letter_queue.flush_interval` + `dead_letter_queue.flush_check_interval`. This value cannot be set to lower than 1000ms. ```yaml dead_letter_queue.flush_check_interval: 1000 ``` ::::{note} -You may not use the same `dead_letter_queue` path for two different Logstash instances. +You cannot use the same `dead_letter_queue` path for two different Logstash instances. :::: diff --git a/logstash-core/src/main/java/org/logstash/common/DeadLetterQueueFactory.java b/logstash-core/src/main/java/org/logstash/common/DeadLetterQueueFactory.java index 84e2e77af0..45d9596083 100644 --- a/logstash-core/src/main/java/org/logstash/common/DeadLetterQueueFactory.java +++ b/logstash-core/src/main/java/org/logstash/common/DeadLetterQueueFactory.java @@ -113,6 +113,7 @@ private static DeadLetterQueueWriter newWriter(final String id, final String dlq return DeadLetterQueueWriter .newBuilder(Paths.get(dlqPath, id), MAX_SEGMENT_SIZE_BYTES, maxQueueSize, flushInterval) .flushCheckInterval(flushCheckInterval) + .pipelineId(id) .storageType(storageType) .build(); } catch (IOException e) { @@ -128,6 +129,7 @@ private static DeadLetterQueueWriter newWriter(final String id, final String dlq return DeadLetterQueueWriter .newBuilder(Paths.get(dlqPath, id), MAX_SEGMENT_SIZE_BYTES, maxQueueSize, flushInterval) .flushCheckInterval(flushCheckInterval) + .pipelineId(id) .storageType(storageType) .retentionTime(age) .build(); diff --git a/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueWriter.java b/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueWriter.java index 629cb39462..d3fbbe2f9a 100644 --- a/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueWriter.java +++ b/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueWriter.java @@ -124,7 +124,8 @@ public String toString() { } } - static final Duration MIN_FLUSH_INTERVAL = Duration.ofMillis(1000); + static final Duration MIN_FLUSH_PERIOD = Duration.ofMillis(1000); + static final Duration MIN_FLUSH_CHECK_INTERVAL = Duration.ofMillis(1000); @VisibleForTesting static final String SEGMENT_FILE_PATTERN = "%d.log"; @@ -176,18 +177,19 @@ interface SchedulerService { private static class FixedRateScheduler implements SchedulerService { - private static final long MIN_CHECK_INTERVAL_MS = 1000L; private final ScheduledExecutorService scheduledExecutor; private final long checkIntervalMs; - FixedRateScheduler(final Duration flushCheckInterval) { - this.checkIntervalMs = Math.max(flushCheckInterval.toMillis(), MIN_CHECK_INTERVAL_MS); + FixedRateScheduler(final Duration flushCheckInterval, final String pipelineId) { + //Set the name with pipeline ID for better visibility + final String threadName = pipelineId != null ? "dlq-flush-check[" + pipelineId + "]" : "dlq-flush-check"; + + this.checkIntervalMs = Math.max(flushCheckInterval.toMillis(), MIN_FLUSH_CHECK_INTERVAL.toMillis()); scheduledExecutor = Executors.newScheduledThreadPool(1, r -> { Thread t = new Thread(r); //Allow this thread to die when the JVM dies t.setDaemon(true); - //Set the name - t.setName("dlq-flush-check"); + t.setName(threadName); return t; }); } @@ -227,6 +229,7 @@ public static final class Builder { private Duration flushCheckInterval = Duration.ofMillis(1000); private Clock clock = Clock.systemDefaultZone(); private SchedulerService customSchedulerService = null; + private String pipelineId; private Builder(Path queuePath, long maxSegmentSize, long maxQueueSize, Duration flushInterval) { this(queuePath, maxSegmentSize, maxQueueSize, flushInterval, true); @@ -255,6 +258,11 @@ public Builder flushCheckInterval(final Duration interval) { return this; } + public Builder pipelineId(final String pipelineId) { + this.pipelineId = pipelineId; + return this; + } + @VisibleForTesting Builder clock(Clock clock) { this.clock = clock; @@ -273,17 +281,25 @@ public DeadLetterQueueWriter build() throws IOException { } Duration effectiveFlushInterval = flushInterval; - if (startScheduledFlusher && flushInterval.compareTo(MIN_FLUSH_INTERVAL) < 0) { + if (startScheduledFlusher && flushInterval.compareTo(MIN_FLUSH_PERIOD) < 0) { logger.warn("dead_letter_queue.flush_interval ({} ms) is below the minimum of {} ms; using {} ms", - flushInterval.toMillis(), MIN_FLUSH_INTERVAL.toMillis(), MIN_FLUSH_INTERVAL.toMillis()); - effectiveFlushInterval = MIN_FLUSH_INTERVAL; + flushInterval.toMillis(), MIN_FLUSH_PERIOD.toMillis(), MIN_FLUSH_PERIOD.toMillis()); + effectiveFlushInterval = MIN_FLUSH_PERIOD; } Duration effectiveFlushCheckInterval = flushCheckInterval; - if (startScheduledFlusher && flushCheckInterval.compareTo(effectiveFlushInterval) > 0) { - logger.warn("dead_letter_queue.flush_check_interval ({} ms) cannot be greater than dead_letter_queue.flush_interval ({} ms); using {} ms", - flushCheckInterval.toMillis(), effectiveFlushInterval.toMillis(), effectiveFlushInterval.toMillis()); - effectiveFlushCheckInterval = effectiveFlushInterval; + if (startScheduledFlusher) { + // Clamp to maximum (can't exceed flush interval) + if (effectiveFlushCheckInterval.compareTo(effectiveFlushInterval) > 0) { + logger.warn("dead_letter_queue.flush_check_interval ({} ms) cannot be greater than dead_letter_queue.flush_interval ({} ms); using {} ms", + effectiveFlushCheckInterval.toMillis(), effectiveFlushInterval.toMillis(), effectiveFlushInterval.toMillis()); + effectiveFlushCheckInterval = effectiveFlushInterval; + } + // Clamp to minimum (for scheduler only, don't change stored value) + if (effectiveFlushCheckInterval.compareTo(MIN_FLUSH_CHECK_INTERVAL) < 0) { + logger.warn("dead_letter_queue.flush_check_interval ({} ms) is below the minimum of {} ms; using {} ms for the scheduler", + effectiveFlushCheckInterval.toMillis(), MIN_FLUSH_CHECK_INTERVAL.toMillis(), MIN_FLUSH_CHECK_INTERVAL.toMillis()); + } } SchedulerService schedulerService; @@ -291,7 +307,8 @@ public DeadLetterQueueWriter build() throws IOException { schedulerService = customSchedulerService; } else { if (startScheduledFlusher) { - schedulerService = new FixedRateScheduler(effectiveFlushCheckInterval); + // Use clamped-to-max value for scheduler (which will further clamp to MIN in FixedRateScheduler) + schedulerService = new FixedRateScheduler(effectiveFlushCheckInterval, pipelineId); } else { schedulerService = new NoopScheduler(); } diff --git a/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueWriterTest.java b/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueWriterTest.java index c2ac7af6f9..9b886a847f 100644 --- a/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueWriterTest.java +++ b/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueWriterTest.java @@ -638,9 +638,9 @@ public void testFlushCheckIntervalClampedToMinimum() throws IOException { .flushCheckInterval(belowMinimum) .build(); - // The writer should enforce a minimum of 1000ms in the scheduler - assertTrue("flushCheckInterval below 1000ms should be clamped to 1000ms", - writer.flushCheckInterval().toMillis() >= 1000); + // The writer stores the configured value as-is; the scheduler enforces a minimum of 1000ms internally + assertEquals("flushCheckInterval should be stored as configured", + belowMinimum.toMillis(), writer.flushCheckInterval().toMillis()); writer.close(); } From 76741de35b6d6dc3d192d467b11a0882e167f82b Mon Sep 17 00:00:00 2001 From: Mashhur <99575341+mashhurs@users.noreply.github.com> Date: Tue, 28 Apr 2026 10:35:52 -0700 Subject: [PATCH 3/6] Update logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueWriter.java Remove unused method. Co-authored-by: Andrea Selva --- .../java/org/logstash/common/io/DeadLetterQueueWriter.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueWriter.java b/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueWriter.java index d3fbbe2f9a..a0344d63e7 100644 --- a/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueWriter.java +++ b/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueWriter.java @@ -697,11 +697,6 @@ private static boolean alreadyProcessed(final Event event) { return event.includes(DEAD_LETTER_QUEUE_METADATA_KEY); } - @VisibleForTesting - public void runScheduledFlushCheck() { - scheduledFlushCheck(); - } - // main method for flush scheduler private void scheduledFlushCheck() { logger.trace("Running scheduled check"); From 32275e7ef01505d564f8552a800e46e6833d81b5 Mon Sep 17 00:00:00 2001 From: Mashhur Date: Tue, 28 Apr 2026 13:46:41 -0700 Subject: [PATCH 4/6] Move the flush chech interval to the DeadLetterQueueWriter.Builder. Remove confusing scheduler from the docs explanations. unit tests for the only newly introduced conditions. --- docs/reference/dead-letter-queues.md | 2 +- .../common/DeadLetterQueueFactory.java | 6 +- .../common/io/DeadLetterQueueWriter.java | 92 +++++++-------- .../common/io/DeadLetterQueueReaderTest.java | 26 ++--- ...DeadLetterQueueWriterAgeRetentionTest.java | 11 +- .../common/io/DeadLetterQueueWriterTest.java | 106 +++++++++--------- 6 files changed, 122 insertions(+), 121 deletions(-) diff --git a/docs/reference/dead-letter-queues.md b/docs/reference/dead-letter-queues.md index 71bc18c37d..233d5d6df8 100644 --- a/docs/reference/dead-letter-queues.md +++ b/docs/reference/dead-letter-queues.md @@ -78,7 +78,7 @@ Note that this value cannot be set to lower than 1000ms. dead_letter_queue.flush_interval: 5000 ``` -Dead letter queue scheduler checks are controlled by the `dead_letter_queue.flush_check_interval` setting. This setting controls how frequently the background scheduler checks for stale segments that need to be flushed. This setting is in milliseconds, and defaults to 1000ms. A smaller value ensures faster segment rotation when infrequent writes occur, at the cost of CPU consumption with more frequent scheduler execution. A larger value reduces scheduler overhead but delays segment sealing, for the worst case it will be `dead_letter_queue.flush_interval` + `dead_letter_queue.flush_check_interval`. This value cannot be set to lower than 1000ms. +Stale segments files are periodically checked if they need to be flushed. This period is controlled by the `dead_letter_queue.flush_check_interval` setting. This setting is in milliseconds, and defaults to 1000ms. A smaller value ensures faster segment rotation when infrequent writes occur, at the cost of CPU consumption with more frequent scheduler execution. A larger value reduces scheduler overhead but delays segment sealing, for the worst case it will be `dead_letter_queue.flush_interval` + `dead_letter_queue.flush_check_interval`. This value cannot be set to lower than 1000ms. ```yaml dead_letter_queue.flush_check_interval: 1000 diff --git a/logstash-core/src/main/java/org/logstash/common/DeadLetterQueueFactory.java b/logstash-core/src/main/java/org/logstash/common/DeadLetterQueueFactory.java index 45d9596083..2b1a476a96 100644 --- a/logstash-core/src/main/java/org/logstash/common/DeadLetterQueueFactory.java +++ b/logstash-core/src/main/java/org/logstash/common/DeadLetterQueueFactory.java @@ -111,8 +111,7 @@ private static DeadLetterQueueWriter newWriter(final String id, final String dlq final Duration flushInterval, final Duration flushCheckInterval, final QueueStorageType storageType) { try { return DeadLetterQueueWriter - .newBuilder(Paths.get(dlqPath, id), MAX_SEGMENT_SIZE_BYTES, maxQueueSize, flushInterval) - .flushCheckInterval(flushCheckInterval) + .newBuilder(Paths.get(dlqPath, id), MAX_SEGMENT_SIZE_BYTES, maxQueueSize, flushInterval, flushCheckInterval) .pipelineId(id) .storageType(storageType) .build(); @@ -127,8 +126,7 @@ private static DeadLetterQueueWriter newWriter(final String id, final String dlq final Duration age) { try { return DeadLetterQueueWriter - .newBuilder(Paths.get(dlqPath, id), MAX_SEGMENT_SIZE_BYTES, maxQueueSize, flushInterval) - .flushCheckInterval(flushCheckInterval) + .newBuilder(Paths.get(dlqPath, id), MAX_SEGMENT_SIZE_BYTES, maxQueueSize, flushInterval, flushCheckInterval) .pipelineId(id) .storageType(storageType) .retentionTime(age) diff --git a/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueWriter.java b/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueWriter.java index a0344d63e7..5aea07d220 100644 --- a/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueWriter.java +++ b/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueWriter.java @@ -145,7 +145,6 @@ public String toString() { private volatile int currentSegmentIndex; private volatile Timestamp lastEntryTimestamp; private final Duration flushInterval; - private final Duration flushCheckInterval; private final AtomicBoolean open = new AtomicBoolean(true); private final LongAdder droppedEvents = new LongAdder(); private final LongAdder expiredEvents = new LongAdder(); @@ -180,11 +179,11 @@ private static class FixedRateScheduler implements SchedulerService { private final ScheduledExecutorService scheduledExecutor; private final long checkIntervalMs; - FixedRateScheduler(final Duration flushCheckInterval, final String pipelineId) { + FixedRateScheduler(final long flushCheckInterval, final String pipelineId) { //Set the name with pipeline ID for better visibility final String threadName = pipelineId != null ? "dlq-flush-check[" + pipelineId + "]" : "dlq-flush-check"; - this.checkIntervalMs = Math.max(flushCheckInterval.toMillis(), MIN_FLUSH_CHECK_INTERVAL.toMillis()); + this.checkIntervalMs = flushCheckInterval; scheduledExecutor = Executors.newScheduledThreadPool(1, r -> { Thread t = new Thread(r); //Allow this thread to die when the JVM dies @@ -223,23 +222,24 @@ public static final class Builder { private final long maxSegmentSize; private final long maxQueueSize; private final Duration flushInterval; + private final Duration flushCheckInterval; private boolean startScheduledFlusher; private QueueStorageType storageType = QueueStorageType.DROP_NEWER; private Duration retentionTime = null; - private Duration flushCheckInterval = Duration.ofMillis(1000); private Clock clock = Clock.systemDefaultZone(); private SchedulerService customSchedulerService = null; private String pipelineId; - private Builder(Path queuePath, long maxSegmentSize, long maxQueueSize, Duration flushInterval) { - this(queuePath, maxSegmentSize, maxQueueSize, flushInterval, true); + private Builder(Path queuePath, long maxSegmentSize, long maxQueueSize, final Duration flushInterval, final Duration flushCheckInterval) { + this(queuePath, maxSegmentSize, maxQueueSize, flushInterval, flushCheckInterval, true); } - private Builder(Path queuePath, long maxSegmentSize, long maxQueueSize, Duration flushInterval, boolean startScheduledFlusher) { + private Builder(Path queuePath, long maxSegmentSize, long maxQueueSize, final Duration flushInterval, final Duration flushCheckInterval, boolean startScheduledFlusher) { this.queuePath = queuePath; this.maxSegmentSize = maxSegmentSize; this.maxQueueSize = maxQueueSize; this.flushInterval = flushInterval; + this.flushCheckInterval = flushCheckInterval; this.startScheduledFlusher = startScheduledFlusher; } @@ -253,11 +253,6 @@ public Builder retentionTime(Duration retentionTime) { return this; } - public Builder flushCheckInterval(final Duration interval) { - this.flushCheckInterval = interval; - return this; - } - public Builder pipelineId(final String pipelineId) { this.pipelineId = pipelineId; return this; @@ -280,56 +275,67 @@ public DeadLetterQueueWriter build() throws IOException { throw new IllegalArgumentException("Both default scheduler and custom scheduler were defined, "); } - Duration effectiveFlushInterval = flushInterval; - if (startScheduledFlusher && flushInterval.compareTo(MIN_FLUSH_PERIOD) < 0) { - logger.warn("dead_letter_queue.flush_interval ({} ms) is below the minimum of {} ms; using {} ms", - flushInterval.toMillis(), MIN_FLUSH_PERIOD.toMillis(), MIN_FLUSH_PERIOD.toMillis()); - effectiveFlushInterval = MIN_FLUSH_PERIOD; - } - - Duration effectiveFlushCheckInterval = flushCheckInterval; - if (startScheduledFlusher) { - // Clamp to maximum (can't exceed flush interval) - if (effectiveFlushCheckInterval.compareTo(effectiveFlushInterval) > 0) { - logger.warn("dead_letter_queue.flush_check_interval ({} ms) cannot be greater than dead_letter_queue.flush_interval ({} ms); using {} ms", - effectiveFlushCheckInterval.toMillis(), effectiveFlushInterval.toMillis(), effectiveFlushInterval.toMillis()); - effectiveFlushCheckInterval = effectiveFlushInterval; - } - // Clamp to minimum (for scheduler only, don't change stored value) - if (effectiveFlushCheckInterval.compareTo(MIN_FLUSH_CHECK_INTERVAL) < 0) { - logger.warn("dead_letter_queue.flush_check_interval ({} ms) is below the minimum of {} ms; using {} ms for the scheduler", - effectiveFlushCheckInterval.toMillis(), MIN_FLUSH_CHECK_INTERVAL.toMillis(), MIN_FLUSH_CHECK_INTERVAL.toMillis()); - } - } + final Duration normalizedFlushInterval = normalizeFlushInterval(flushInterval); SchedulerService schedulerService; if (customSchedulerService != null) { schedulerService = customSchedulerService; } else { if (startScheduledFlusher) { - // Use clamped-to-max value for scheduler (which will further clamp to MIN in FixedRateScheduler) - schedulerService = new FixedRateScheduler(effectiveFlushCheckInterval, pipelineId); + final Duration normalizedFlushCheckInterval = normalizeFlushCheckInterval(flushCheckInterval, normalizedFlushInterval); + schedulerService = new FixedRateScheduler(normalizedFlushCheckInterval.toMillis(), pipelineId); } else { schedulerService = new NoopScheduler(); } } - return new DeadLetterQueueWriter(queuePath, maxSegmentSize, maxQueueSize, effectiveFlushInterval, effectiveFlushCheckInterval, storageType, retentionTime, clock, schedulerService); + return new DeadLetterQueueWriter(queuePath, maxSegmentSize, maxQueueSize, normalizedFlushInterval, storageType, retentionTime, clock, schedulerService); + } + + @VisibleForTesting + Duration normalizeFlushInterval(final Duration flushInterval) { + if (!startScheduledFlusher) return flushInterval; + + Duration effectiveFlushInterval = flushInterval; + if (flushInterval.compareTo(MIN_FLUSH_PERIOD) < 0) { + logger.warn("dead_letter_queue.flush_interval ({} ms) is below the minimum of {} ms; using {} ms", + flushInterval.toMillis(), MIN_FLUSH_PERIOD.toMillis(), MIN_FLUSH_PERIOD.toMillis()); + effectiveFlushInterval = MIN_FLUSH_PERIOD; + } + return effectiveFlushInterval; + } + + @VisibleForTesting + Duration normalizeFlushCheckInterval(final Duration flushCheckInterval, final Duration effectiveFlushInterval) { + Duration effectiveFlushCheckInterval = flushCheckInterval; + // can't exceed flush interval + if (effectiveFlushCheckInterval.compareTo(effectiveFlushInterval) > 0) { + logger.warn("dead_letter_queue.flush_check_interval ({} ms) cannot be greater than dead_letter_queue.flush_interval ({} ms); using {} ms", + effectiveFlushCheckInterval.toMillis(), effectiveFlushInterval.toMillis(), effectiveFlushInterval.toMillis()); + effectiveFlushCheckInterval = effectiveFlushInterval; + } + // can't be less than 1s + if (effectiveFlushCheckInterval.compareTo(MIN_FLUSH_CHECK_INTERVAL) < 0) { + logger.warn("dead_letter_queue.flush_check_interval ({} ms) is below the minimum of {} ms; using {} ms for the flush check interval", + effectiveFlushCheckInterval.toMillis(), MIN_FLUSH_CHECK_INTERVAL.toMillis(), MIN_FLUSH_CHECK_INTERVAL.toMillis()); + effectiveFlushCheckInterval = MIN_FLUSH_CHECK_INTERVAL; + } + return effectiveFlushCheckInterval; } } public static Builder newBuilder(final Path queuePath, final long maxSegmentSize, final long maxQueueSize, - final Duration flushInterval) { - return new Builder(queuePath, maxSegmentSize, maxQueueSize, flushInterval); + final Duration flushInterval, final Duration flushCheckInterval) { + return new Builder(queuePath, maxSegmentSize, maxQueueSize, flushInterval, flushCheckInterval); } @VisibleForTesting static Builder newBuilderWithoutFlusher(final Path queuePath, final long maxSegmentSize, final long maxQueueSize) { - return new Builder(queuePath, maxSegmentSize, maxQueueSize, Duration.ZERO, false); + return new Builder(queuePath, maxSegmentSize, maxQueueSize, Duration.ZERO, Duration.ZERO, false); } private DeadLetterQueueWriter(final Path queuePath, final long maxSegmentSize, final long maxQueueSize, - final Duration flushInterval, final Duration flushCheckInterval, final QueueStorageType storageType, final Duration retentionTime, + final Duration flushInterval, final QueueStorageType storageType, final Duration retentionTime, final Clock clock, SchedulerService flusherService) throws IOException { this.clock = clock; @@ -339,7 +345,6 @@ private DeadLetterQueueWriter(final Path queuePath, final long maxSegmentSize, f this.maxQueueSize = maxQueueSize; this.storageType = storageType; this.flushInterval = flushInterval; - this.flushCheckInterval = flushCheckInterval; this.currentQueueSize = new AtomicLong(computeQueueSize()); this.retentionTime = retentionTime; @@ -417,11 +422,6 @@ public long getCurrentQueueSize() { return currentQueueSize.longValue(); } - @VisibleForTesting - public Duration flushCheckInterval() { - return flushCheckInterval; - } - public String getStoragePolicy() { return storageType.name().toLowerCase(Locale.ROOT); } diff --git a/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueReaderTest.java b/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueReaderTest.java index da8ae7e91e..629285378f 100644 --- a/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueReaderTest.java +++ b/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueReaderTest.java @@ -157,7 +157,7 @@ public void testRereadFinalBlock() throws Exception { long startTime = System.currentTimeMillis(); int messageSize = 0; try(DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, 10 * 1024 * 1024, defaultDlqSize, Duration.ofSeconds(1)) + .newBuilder(dir, 10 * 1024 * 1024, defaultDlqSize, Duration.ofSeconds(1), Duration.ofSeconds(1)) .build()) { for (int i = 0; i < 2; i++) { DLQEntry entry = new DLQEntry(event, "", "", String.valueOf(i), constantSerializationLengthTimestamp(startTime++)); @@ -214,7 +214,7 @@ private void writeSegmentSizeEntries(int count) throws IOException { DLQEntry templateEntry = new DLQEntry(event, "1", "1", String.valueOf(0), constantSerializationLengthTimestamp(startTime)); int size = templateEntry.serialize().length + RecordIOWriter.RECORD_HEADER_SIZE + VERSION_SIZE; try (DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, size, defaultDlqSize, Duration.ofSeconds(1)) + .newBuilder(dir, size, defaultDlqSize, Duration.ofSeconds(1), Duration.ofSeconds(1)) .build()) { for (int i = 1; i <= count; i++) { writeManager.writeEntry(new DLQEntry(event, "1", "1", String.valueOf(i), constantSerializationLengthTimestamp(startTime++))); @@ -248,7 +248,7 @@ public void testBlockBoundary() throws Exception { Timestamp timestamp = constantSerializationLengthTimestamp(); try (DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, 10 * 1024 * 1024, defaultDlqSize, Duration.ofSeconds(1)) + .newBuilder(dir, 10 * 1024 * 1024, defaultDlqSize, Duration.ofSeconds(1), Duration.ofSeconds(1)) .build()) { for (int i = 0; i < 2; i++) { DLQEntry entry = new DLQEntry(event, "", "", "", timestamp); @@ -273,7 +273,7 @@ public void testBlockBoundaryMultiple() throws Exception { long startTime = System.currentTimeMillis(); int messageSize = 0; try (DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, 10 * 1024 * 1024, defaultDlqSize, Duration.ofSeconds(1)) + .newBuilder(dir, 10 * 1024 * 1024, defaultDlqSize, Duration.ofSeconds(1), Duration.ofSeconds(1)) .build()) { for (int i = 1; i <= 5; i++) { DLQEntry entry = new DLQEntry(event, "", "", "", constantSerializationLengthTimestamp(startTime++)); @@ -298,7 +298,7 @@ public void testFlushAfterWriterClose() throws Exception { Timestamp timestamp = new Timestamp(); try (DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, BLOCK_SIZE, defaultDlqSize, Duration.ofSeconds(1)) + .newBuilder(dir, BLOCK_SIZE, defaultDlqSize, Duration.ofSeconds(1), Duration.ofSeconds(1)) .build()) { for (int i = 0; i < 6; i++) { DLQEntry entry = new DLQEntry(event, "", "", Integer.toString(i), timestamp); @@ -320,7 +320,7 @@ public void testFlushAfterSegmentComplete() throws Exception { event.setField("T", generateMessageContent(PAD_FOR_BLOCK_SIZE_EVENT)); Timestamp timestamp = new Timestamp(); try (DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, BLOCK_SIZE * EVENTS_BEFORE_FLUSH, defaultDlqSize, Duration.ofHours(1)) + .newBuilder(dir, BLOCK_SIZE * EVENTS_BEFORE_FLUSH, defaultDlqSize, Duration.ofHours(1), Duration.ofSeconds(1)) .build()) { for (int i = 1; i < EVENTS_BEFORE_FLUSH; i++) { DLQEntry entry = new DLQEntry(event, "", "", Integer.toString(i), timestamp); @@ -355,7 +355,7 @@ public void testMultiFlushAfterSegmentComplete() throws Exception { Timestamp timestamp = new Timestamp(); try (DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, BLOCK_SIZE * eventsInSegment, defaultDlqSize, Duration.ofHours(1)) + .newBuilder(dir, BLOCK_SIZE * eventsInSegment, defaultDlqSize, Duration.ofHours(1), Duration.ofSeconds(1)) .build()) { for (int i = 1; i < totalEventsToWrite; i++) { DLQEntry entry = new DLQEntry(event, "", "", Integer.toString(i), timestamp); @@ -398,7 +398,7 @@ public void testFlushAfterDelay() throws Exception { System.out.println("events per block= " + eventsPerBlock); try(DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, BLOCK_SIZE, defaultDlqSize, Duration.ofSeconds(2)) + .newBuilder(dir, BLOCK_SIZE, defaultDlqSize, Duration.ofSeconds(2), Duration.ofSeconds(1)) .build()) { for (int i = 1; i < eventsToWrite; i++) { DLQEntry entry = new DLQEntry(event, "", "", Integer.toString(i), timestamp); @@ -432,7 +432,7 @@ public void testBlockAndSegmentBoundary() throws Exception { Timestamp timestamp = constantSerializationLengthTimestamp(); try(DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, BLOCK_SIZE, defaultDlqSize, Duration.ofSeconds(1)) + .newBuilder(dir, BLOCK_SIZE, defaultDlqSize, Duration.ofSeconds(1), Duration.ofSeconds(1)) .build()) { for (int i = 0; i < 2; i++) { DLQEntry entry = new DLQEntry(event, "", "", "", timestamp); @@ -455,7 +455,7 @@ public void testWriteReadRandomEventSize() throws Exception { long startTime = System.currentTimeMillis(); try(DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, 10 * 1024 * 1024, defaultDlqSize, Duration.ofSeconds(1)) + .newBuilder(dir, 10 * 1024 * 1024, defaultDlqSize, Duration.ofSeconds(1), Duration.ofSeconds(1)) .build()) { for (int i = 0; i < eventCount; i++) { event.setField("message", generateMessageContent((int)(Math.random() * (maxEventSize)))); @@ -575,7 +575,7 @@ public void testConcurrentWriteReadRandomEventSize() throws Exception { final Event event = new Event(); long startTime = System.currentTimeMillis(); try (DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, 10 * 1024 * 1024, defaultDlqSize, Duration.ofSeconds(10)) + .newBuilder(dir, 10 * 1024 * 1024, defaultDlqSize, Duration.ofSeconds(10), Duration.ofSeconds(1)) .build()) { for (int i = 0; i < eventCount; i++) { event.setField( @@ -920,7 +920,7 @@ private void seekReadAndVerify(final Timestamp seekTarget, final String expected private void writeEntries(final Event event, int offset, final int numberOfEvents, long startTime) throws IOException { try (DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, 10 * 1024 * 1024, defaultDlqSize, Duration.ofSeconds(1)) + .newBuilder(dir, 10 * 1024 * 1024, defaultDlqSize, Duration.ofSeconds(1), Duration.ofSeconds(1)) .build()) { for (int i = offset; i <= offset + numberOfEvents; i++) { DLQEntry entry = new DLQEntry(event, "foo", "bar", String.valueOf(i), new Timestamp(startTime++)); @@ -943,7 +943,7 @@ private int prepareFilledSegmentFiles(int segments, long start) throws IOExcepti final int maxSegmentSize = 10 * MB; final int loopPerSegment = (int) Math.floor((maxSegmentSize - 1.0) / BLOCK_SIZE); try (DeadLetterQueueWriter writer = DeadLetterQueueWriter - .newBuilder(dir, maxSegmentSize, defaultDlqSize, Duration.ofSeconds(1)) + .newBuilder(dir, maxSegmentSize, defaultDlqSize, Duration.ofSeconds(1), Duration.ofSeconds(1)) .build()) { final int loops = loopPerSegment * segments; for (int i = 0; i < loops; i++) { diff --git a/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueWriterAgeRetentionTest.java b/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueWriterAgeRetentionTest.java index 2f6e6161c2..ce3c934444 100644 --- a/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueWriterAgeRetentionTest.java +++ b/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueWriterAgeRetentionTest.java @@ -86,7 +86,7 @@ public void testRemovesOlderSegmentsWhenWriteOnReopenedDLQContainingExpiredSegme final long prevQueueSize; final long beheadedQueueSize; try (DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, 10 * MB, 1 * GB, Duration.ofSeconds(1)) + .newBuilder(dir, 10 * MB, 1 * GB, Duration.ofSeconds(1), Duration.ofSeconds(1)) .retentionTime(Duration.ofDays(2)) .clock(fakeClock) .build()) { @@ -110,7 +110,7 @@ private void prepareDLQWithFirstSegmentOlderThanRetainPeriod(Event event, Clock long startTime = fakeClock.instant().minus(littleMoreThanRetainedPeriod).toEpochMilli(); int messageSize = 0; try (DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, 10 * MB, 1 * GB, Duration.ofSeconds(1)) + .newBuilder(dir, 10 * MB, 1 * GB, Duration.ofSeconds(1), Duration.ofSeconds(1)) .retentionTime(retainedPeriod) .clock(fakeClock) .build()) { @@ -137,7 +137,7 @@ public void testRemovesOlderSegmentsWhenWritesIntoDLQContainingExpiredSegments() final Duration retention = Duration.ofDays(2); try (DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, 10 * MB, 1 * GB, Duration.ofSeconds(1)) + .newBuilder(dir, 10 * MB, 1 * GB, Duration.ofSeconds(1), Duration.ofSeconds(1)) .retentionTime(retention) .clock(fakeClock) .build()) { @@ -180,7 +180,7 @@ public void testRemoveMultipleOldestSegmentsWhenRetainedAgeIsExceeded() throws I final Duration retention = Duration.ofDays(2); try (DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, 10 * MB, 1 * GB, Duration.ofSeconds(1)) + .newBuilder(dir, 10 * MB, 1 * GB, Duration.ofSeconds(1), Duration.ofSeconds(1)) .retentionTime(retention) .clock(fakeClock) .build()) { @@ -272,9 +272,8 @@ public void testDLQWriterFlusherRemovesExpiredSegmentWhenCurrentWriterIsStale() final ManualAdvanceClock fakeClock = new ManualAdvanceClock(ZoneId.of("Europe/Rome")); Duration retainedPeriod = Duration.ofDays(1); - Duration flushInterval = Duration.ofSeconds(1); try (DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, 10 * MB, 1 * GB, flushInterval) + .newBuilder(dir, 10 * MB, 1 * GB, Duration.ofSeconds(1), Duration.ofSeconds(1)) .retentionTime(retainedPeriod) .clock(fakeClock) .build()) { diff --git a/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueWriterTest.java b/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueWriterTest.java index 9b886a847f..c451482179 100644 --- a/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueWriterTest.java +++ b/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueWriterTest.java @@ -76,7 +76,7 @@ public void setUp() throws Exception { public void testLockFileManagement() throws Exception { Path lockFile = dir.resolve(".lock"); DeadLetterQueueWriter writer = DeadLetterQueueWriter - .newBuilder(dir, 1_000, 100_000, Duration.ofSeconds(1)) + .newBuilder(dir, 1_000, 100_000, Duration.ofSeconds(1), Duration.ofSeconds(1)) .build(); assertTrue(Files.exists(lockFile)); writer.close(); @@ -86,11 +86,11 @@ public void testLockFileManagement() throws Exception { @Test public void testFileLocking() throws Exception { DeadLetterQueueWriter writer = DeadLetterQueueWriter - .newBuilder(dir, 1_000, 100_000, Duration.ofSeconds(1)) + .newBuilder(dir, 1_000, 100_000, Duration.ofSeconds(1), Duration.ofSeconds(1)) .build(); try { DeadLetterQueueWriter - .newBuilder(dir, 100, 1_000, Duration.ofSeconds(1)) + .newBuilder(dir, 100, 1_000, Duration.ofSeconds(1), Duration.ofSeconds(1)) .build(); fail(); } catch (LockException e) { @@ -104,7 +104,7 @@ public void testUncleanCloseOfPreviousWriter() throws Exception { Path lockFilePath = dir.resolve(".lock"); boolean created = lockFilePath.toFile().createNewFile(); DeadLetterQueueWriter writer = DeadLetterQueueWriter - .newBuilder(dir, 1_000, 100_000, Duration.ofSeconds(1)) + .newBuilder(dir, 1_000, 100_000, Duration.ofSeconds(1), Duration.ofSeconds(1)) .build(); FileChannel channel = FileChannel.open(lockFilePath, StandardOpenOption.WRITE); @@ -121,7 +121,7 @@ public void testUncleanCloseOfPreviousWriter() throws Exception { @Test public void testWrite() throws Exception { DeadLetterQueueWriter writer = DeadLetterQueueWriter - .newBuilder(dir, 1_000, 100_000, Duration.ofSeconds(1)) + .newBuilder(dir, 1_000, 100_000, Duration.ofSeconds(1), Duration.ofSeconds(1)) .build(); DLQEntry entry = new DLQEntry(new Event(), "type", "id", "reason"); writer.writeEntry(entry); @@ -136,7 +136,7 @@ public void testDoesNotWriteMessagesAlreadyRoutedThroughDLQ() throws Exception { DLQEntry dlqEntry = new DLQEntry(dlqEvent, "type", "id", "reason"); try (DeadLetterQueueWriter writer = DeadLetterQueueWriter - .newBuilder(dir, 1_000, 100_000, Duration.ofSeconds(1)) + .newBuilder(dir, 1_000, 100_000, Duration.ofSeconds(1), Duration.ofSeconds(1)) .build()) { writer.writeEntry(entry); long dlqLengthAfterEvent = dlqLength(); @@ -157,7 +157,7 @@ public void testDoesNotWriteBeyondLimit() throws Exception { try (DeadLetterQueueWriter writer = DeadLetterQueueWriter - .newBuilder(dir, payloadLength, MAX_QUEUE_LENGTH, Duration.ofSeconds(1)) + .newBuilder(dir, payloadLength, MAX_QUEUE_LENGTH, Duration.ofSeconds(1), Duration.ofSeconds(1)) .build()) { for (int i = 0; i < MESSAGE_COUNT; i++) @@ -175,7 +175,7 @@ public void testDoesNotWriteBeyondLimit() throws Exception { @Test public void testSlowFlush() throws Exception { try (DeadLetterQueueWriter writer = DeadLetterQueueWriter - .newBuilder(dir, 1_000, 1_000_000, Duration.ofSeconds(1)) + .newBuilder(dir, 1_000, 1_000_000, Duration.ofSeconds(1), Duration.ofSeconds(1)) .build()) { DLQEntry entry = new DLQEntry(new Event(), "type", "id", "1"); writer.writeEntry(entry); @@ -197,7 +197,7 @@ public void testSlowFlush() throws Exception { @Test public void testNotFlushed() throws Exception { try (DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, BLOCK_SIZE, 1_000_000_000, Duration.ofSeconds(5)) + .newBuilder(dir, BLOCK_SIZE, 1_000_000_000, Duration.ofSeconds(5), Duration.ofSeconds(1)) .build()) { for (int i = 0; i < 4; i++) { DLQEntry entry = new DLQEntry(new Event(), "type", "id", "1"); @@ -220,7 +220,7 @@ public void testNotFlushed() throws Exception { @Test public void testCloseFlush() throws Exception { try (DeadLetterQueueWriter writer = DeadLetterQueueWriter - .newBuilder(dir, 1_000, 1_000_000, Duration.ofHours(1)) + .newBuilder(dir, 1_000, 1_000_000, Duration.ofHours(1), Duration.ofSeconds(1)) .build()) { DLQEntry entry = new DLQEntry(new Event(), "type", "id", "1"); writer.writeEntry(entry); @@ -256,7 +256,7 @@ public void testRemoveOldestSegmentWhenRetainedSizeIsExceededAndDropOlderModeIsE int messageSize = 0; try (DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, 10 * MB, 20 * MB, Duration.ofSeconds(1)) + .newBuilder(dir, 10 * MB, 20 * MB, Duration.ofSeconds(1), Duration.ofSeconds(1)) .build()) { // 320 generates 10 Mb of data @@ -288,7 +288,7 @@ public void testRemoveOldestSegmentWhenRetainedSizeIsExceededAndDropOlderModeIsE final long beheadedQueueSize; long droppedEvent; try (DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, 10 * MB, 20 * MB, Duration.ofSeconds(1)) + .newBuilder(dir, 10 * MB, 20 * MB, Duration.ofSeconds(1), Duration.ofSeconds(1)) .storageType(QueueStorageType.DROP_OLDER) .build()) { prevQueueSize = writeManager.getCurrentQueueSize(); @@ -326,7 +326,7 @@ public void testRemoveOldestSegmentWhenRetainedSizeIsExceededAndDropOlderModeIsE @Test public void testRemoveSegmentsOrder() throws IOException { try (DeadLetterQueueWriter sut = DeadLetterQueueWriter - .newBuilder(dir, 10 * MB, 20 * MB, Duration.ofSeconds(1)) + .newBuilder(dir, 10 * MB, 20 * MB, Duration.ofSeconds(1), Duration.ofSeconds(1)) .build()) { // create some segments files Files.createFile(dir.resolve("9.log")); @@ -496,7 +496,7 @@ public void testDropEventCountCorrectlyNotEnqueuedEvents() throws IOException, I bigEvent.setField("message", DeadLetterQueueReaderTest.generateMessageContent(2 * BLOCK_SIZE)); try (DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, 10 * MB, 20 * MB, Duration.ofSeconds(1)) + .newBuilder(dir, 10 * MB, 20 * MB, Duration.ofSeconds(1), Duration.ofSeconds(1)) .build()) { // enqueue a record with size smaller than BLOCK_SIZE DLQEntry entry = new DLQEntry(blockAlmostFullEvent, "", "", "00001", DeadLetterQueueReaderTest.constantSerializationLengthTimestamp(System.currentTimeMillis())); @@ -514,7 +514,7 @@ public void testDropEventCountCorrectlyNotEnqueuedEvents() throws IOException, I Event event = DeadLetterQueueReaderTest.createEventWithConstantSerializationOverhead(Collections.emptyMap()); event.setField("message", DeadLetterQueueReaderTest.generateMessageContent(32500)); try (DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, 10 * MB, 20 * MB, Duration.ofSeconds(1)) + .newBuilder(dir, 10 * MB, 20 * MB, Duration.ofSeconds(1), Duration.ofSeconds(1)) .storageType(QueueStorageType.DROP_NEWER) .build()) { @@ -544,7 +544,7 @@ public void testInitializeWriterWith1ByteEntry() throws Exception { Files.write(dir.resolve("1.log"), "1".getBytes()); DeadLetterQueueWriter writer = DeadLetterQueueWriter - .newBuilder(dir, 1_000, 100_000, Duration.ofSeconds(1)) + .newBuilder(dir, 1_000, 100_000, Duration.ofSeconds(1), Duration.ofSeconds(1)) .build(); writer.close(); } @@ -552,7 +552,7 @@ public void testInitializeWriterWith1ByteEntry() throws Exception { @Test public void givenDLQWriterCreatedSomeSegmentsWhenReaderWithCleanConsumedNotifyTheDeletionOfSomeThenWriterUpdatesItsMetricsSize() throws IOException, InterruptedException { try (DeadLetterQueueWriter writer = DeadLetterQueueWriter - .newBuilder(dir, 1 * MB, 100 * MB, Duration.ofSeconds(1)) + .newBuilder(dir, 1 * MB, 100 * MB, Duration.ofSeconds(1), Duration.ofSeconds(1)) .build()) { // fill at least 3 segments @@ -617,48 +617,52 @@ private long countDlqSegments(Path dir) throws IOException { } @Test - public void testFlushCheckIntervalIsRespected() throws IOException { - final Duration flushCheckInterval = Duration.ofMillis(500); - DeadLetterQueueWriter writer = DeadLetterQueueWriter - .newBuilder(dir, 1_000, 100_000, Duration.ofSeconds(5)) - .flushCheckInterval(flushCheckInterval) - .build(); - - assertEquals("flushCheckInterval should be applied to the writer", flushCheckInterval.toMillis(), - writer.flushCheckInterval().toMillis()); - - writer.close(); + public void testNormalizeFlushIntervalWithValidInterval() { + DeadLetterQueueWriter.Builder builder = DeadLetterQueueWriter.newBuilder(dir, 1_000, 100_000, Duration.ofSeconds(5), Duration.ofSeconds(1)); + Duration result = builder.normalizeFlushInterval(Duration.ofSeconds(10)); + assertEquals("Valid flush interval should remain unchanged", Duration.ofSeconds(10), result); } @Test - public void testFlushCheckIntervalClampedToMinimum() throws IOException { - final Duration belowMinimum = Duration.ofMillis(500); - DeadLetterQueueWriter writer = DeadLetterQueueWriter - .newBuilder(dir, 1_000, 100_000, Duration.ofSeconds(5)) - .flushCheckInterval(belowMinimum) - .build(); - - // The writer stores the configured value as-is; the scheduler enforces a minimum of 1000ms internally - assertEquals("flushCheckInterval should be stored as configured", - belowMinimum.toMillis(), writer.flushCheckInterval().toMillis()); - - writer.close(); + public void testNormalizeFlushIntervalBelowMinimum() { + DeadLetterQueueWriter.Builder builder = DeadLetterQueueWriter.newBuilder(dir, 1_000, 100_000, Duration.ofSeconds(5), Duration.ofSeconds(1)); + Duration result = builder.normalizeFlushInterval(Duration.ofMillis(100)); + assertEquals("Flush interval below 1s should be clamped to 1s", Duration.ofSeconds(1), result); } @Test - public void testFlushCheckIntervalClampedToFlushInterval() throws IOException { - final Duration flushInterval = Duration.ofSeconds(3); - final Duration aboveFlushInterval = Duration.ofSeconds(5); - DeadLetterQueueWriter writer = DeadLetterQueueWriter - .newBuilder(dir, 1_000, 100_000, flushInterval) - .flushCheckInterval(aboveFlushInterval) - .build(); + public void testNormalizeFlushCheckIntervalWithinLimits() { + DeadLetterQueueWriter.Builder builder = DeadLetterQueueWriter.newBuilder(dir, 1_000, 100_000, Duration.ofSeconds(5), Duration.ofSeconds(1)); + Duration flushInterval = Duration.ofSeconds(5); + Duration flushCheckInterval = Duration.ofSeconds(2); + Duration result = builder.normalizeFlushCheckInterval(flushCheckInterval, flushInterval); + assertEquals("Valid flush check interval should remain unchanged", flushCheckInterval, result); + } - // The writer should clamp flushCheckInterval to not exceed flushInterval - assertTrue("flushCheckInterval should not exceed flushInterval", - writer.flushCheckInterval().compareTo(flushInterval) <= 0); + @Test + public void testNormalizeFlushCheckIntervalBelowMinimum() { + DeadLetterQueueWriter.Builder builder = DeadLetterQueueWriter.newBuilder(dir, 1_000, 100_000, Duration.ofSeconds(5), Duration.ofSeconds(1)); + Duration flushInterval = Duration.ofSeconds(5); + Duration belowMinimum = Duration.ofMillis(500); + Duration result = builder.normalizeFlushCheckInterval(belowMinimum, flushInterval); + assertEquals("Flush check interval below 1s should be clamped to 1s", Duration.ofSeconds(1), result); + } - writer.close(); + @Test + public void testNormalizeFlushCheckIntervalExceedsFlushInterval() { + DeadLetterQueueWriter.Builder builder = DeadLetterQueueWriter.newBuilder(dir, 1_000, 100_000, Duration.ofSeconds(5), Duration.ofSeconds(1)); + Duration flushInterval = Duration.ofSeconds(3); + Duration aboveFlushInterval = Duration.ofSeconds(5); + Duration result = builder.normalizeFlushCheckInterval(aboveFlushInterval, flushInterval); + assertEquals("Flush check interval exceeding flush interval should be clamped to flush interval", flushInterval, result); } + @Test + public void testNormalizeFlushCheckIntervalJustBelowFlushInterval() { + DeadLetterQueueWriter.Builder builder = DeadLetterQueueWriter.newBuilder(dir, 1_000, 100_000, Duration.ofSeconds(5), Duration.ofSeconds(1)); + Duration flushInterval = Duration.ofSeconds(5); + Duration justBelowFlushInterval = Duration.ofMillis(4900); + Duration result = builder.normalizeFlushCheckInterval(justBelowFlushInterval, flushInterval); + assertEquals("Flush check interval just below flush interval should be accepted", justBelowFlushInterval, result); + } } From 86edfb5c6d5e187309cb4b9e74d04b0ab3bfeab4 Mon Sep 17 00:00:00 2001 From: Mashhur <99575341+mashhurs@users.noreply.github.com> Date: Wed, 29 Apr 2026 10:22:53 -0700 Subject: [PATCH 5/6] Apply suggestions from code review Doc consistency and test rename suggestions accepted. Co-authored-by: Andrea Selva --- docs/reference/dead-letter-queues.md | 2 +- .../org/logstash/common/io/DeadLetterQueueWriterTest.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/reference/dead-letter-queues.md b/docs/reference/dead-letter-queues.md index 233d5d6df8..7752e37d13 100644 --- a/docs/reference/dead-letter-queues.md +++ b/docs/reference/dead-letter-queues.md @@ -78,7 +78,7 @@ Note that this value cannot be set to lower than 1000ms. dead_letter_queue.flush_interval: 5000 ``` -Stale segments files are periodically checked if they need to be flushed. This period is controlled by the `dead_letter_queue.flush_check_interval` setting. This setting is in milliseconds, and defaults to 1000ms. A smaller value ensures faster segment rotation when infrequent writes occur, at the cost of CPU consumption with more frequent scheduler execution. A larger value reduces scheduler overhead but delays segment sealing, for the worst case it will be `dead_letter_queue.flush_interval` + `dead_letter_queue.flush_check_interval`. This value cannot be set to lower than 1000ms. +Stale segments files are periodically checked if they need to be flushed. This period is controlled by the `dead_letter_queue.flush_check_interval` setting. This setting is in milliseconds, and defaults to 1000ms. A smaller value ensures faster segment rotation when infrequent writes occur, at the cost of CPU consumption with more frequent segment checks execution. A larger value reduces checks overhead but delays segment sealing, for the worst case it will be `dead_letter_queue.flush_interval` + `dead_letter_queue.flush_check_interval`. This value cannot be set to lower than 1000ms. ```yaml dead_letter_queue.flush_check_interval: 1000 diff --git a/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueWriterTest.java b/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueWriterTest.java index c451482179..202694f097 100644 --- a/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueWriterTest.java +++ b/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueWriterTest.java @@ -617,14 +617,14 @@ private long countDlqSegments(Path dir) throws IOException { } @Test - public void testNormalizeFlushIntervalWithValidInterval() { + public void givenFlushIntervalGreatherThanMinimumWhenNormalizedThenRemainsUnmodified() { DeadLetterQueueWriter.Builder builder = DeadLetterQueueWriter.newBuilder(dir, 1_000, 100_000, Duration.ofSeconds(5), Duration.ofSeconds(1)); Duration result = builder.normalizeFlushInterval(Duration.ofSeconds(10)); assertEquals("Valid flush interval should remain unchanged", Duration.ofSeconds(10), result); } @Test - public void testNormalizeFlushIntervalBelowMinimum() { + public void givenFlushIntervalBelowTheMinimumWhenNormalizedThenIsClampedToMinimum() { DeadLetterQueueWriter.Builder builder = DeadLetterQueueWriter.newBuilder(dir, 1_000, 100_000, Duration.ofSeconds(5), Duration.ofSeconds(1)); Duration result = builder.normalizeFlushInterval(Duration.ofMillis(100)); assertEquals("Flush interval below 1s should be clamped to 1s", Duration.ofSeconds(1), result); From 46f156878dde15f5f5cd284d27b819ed81f31278 Mon Sep 17 00:00:00 2001 From: Mashhur Date: Wed, 29 Apr 2026 12:14:50 -0700 Subject: [PATCH 6/6] Keep the interval type as a Duration, rename and simplify test suites. --- .../common/io/DeadLetterQueueWriter.java | 10 +-- ...DeadLetterQueueWriterAgeRetentionTest.java | 23 ++--- .../common/io/DeadLetterQueueWriterTest.java | 87 +++++++++---------- 3 files changed, 61 insertions(+), 59 deletions(-) diff --git a/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueWriter.java b/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueWriter.java index 5aea07d220..d4dfc3af67 100644 --- a/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueWriter.java +++ b/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueWriter.java @@ -177,13 +177,13 @@ interface SchedulerService { private static class FixedRateScheduler implements SchedulerService { private final ScheduledExecutorService scheduledExecutor; - private final long checkIntervalMs; + private final Duration flushCheckInterval; - FixedRateScheduler(final long flushCheckInterval, final String pipelineId) { + FixedRateScheduler(final Duration flushCheckInterval, final String pipelineId) { //Set the name with pipeline ID for better visibility final String threadName = pipelineId != null ? "dlq-flush-check[" + pipelineId + "]" : "dlq-flush-check"; - this.checkIntervalMs = flushCheckInterval; + this.flushCheckInterval = flushCheckInterval; scheduledExecutor = Executors.newScheduledThreadPool(1, r -> { Thread t = new Thread(r); //Allow this thread to die when the JVM dies @@ -195,7 +195,7 @@ private static class FixedRateScheduler implements SchedulerService { @Override public void repeatedAction(Runnable action) { - scheduledExecutor.scheduleAtFixedRate(action, checkIntervalMs, checkIntervalMs, TimeUnit.MILLISECONDS); + scheduledExecutor.scheduleAtFixedRate(action, flushCheckInterval.toMillis(), flushCheckInterval.toMillis(), TimeUnit.MILLISECONDS); } @Override @@ -283,7 +283,7 @@ public DeadLetterQueueWriter build() throws IOException { } else { if (startScheduledFlusher) { final Duration normalizedFlushCheckInterval = normalizeFlushCheckInterval(flushCheckInterval, normalizedFlushInterval); - schedulerService = new FixedRateScheduler(normalizedFlushCheckInterval.toMillis(), pipelineId); + schedulerService = new FixedRateScheduler(normalizedFlushCheckInterval, pipelineId); } else { schedulerService = new NoopScheduler(); } diff --git a/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueWriterAgeRetentionTest.java b/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueWriterAgeRetentionTest.java index ce3c934444..95d851ce04 100644 --- a/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueWriterAgeRetentionTest.java +++ b/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueWriterAgeRetentionTest.java @@ -66,6 +66,9 @@ void executeAction() { private SynchronizedScheduledService synchScheduler; + private static final long MAX_SEGMENT_SIZE = 10 * MB; + private static final long MAX_QUEUE_SIZE = 1 * GB; + @Before public void setUp() throws Exception { dir = temporaryFolder.newFolder().toPath(); @@ -73,6 +76,11 @@ public void setUp() throws Exception { synchScheduler = new SynchronizedScheduledService(); } + private static DeadLetterQueueWriter.Builder newBuilder(final Path queuePath) { + return DeadLetterQueueWriter + .newBuilder(queuePath, MAX_SEGMENT_SIZE, MAX_QUEUE_SIZE, Duration.ofSeconds(1), Duration.ofSeconds(1)); + } + @Test public void testRemovesOlderSegmentsWhenWriteOnReopenedDLQContainingExpiredSegments() throws IOException { final Event event = DeadLetterQueueReaderTest.createEventWithConstantSerializationOverhead(Collections.emptyMap()); @@ -85,8 +93,7 @@ public void testRemovesOlderSegmentsWhenWriteOnReopenedDLQContainingExpiredSegme // Exercise final long prevQueueSize; final long beheadedQueueSize; - try (DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, 10 * MB, 1 * GB, Duration.ofSeconds(1), Duration.ofSeconds(1)) + try (DeadLetterQueueWriter writeManager = newBuilder(dir) .retentionTime(Duration.ofDays(2)) .clock(fakeClock) .build()) { @@ -109,8 +116,7 @@ private void prepareDLQWithFirstSegmentOlderThanRetainPeriod(Event event, Clock final Duration littleMoreThanRetainedPeriod = retainedPeriod.plusMinutes(1); long startTime = fakeClock.instant().minus(littleMoreThanRetainedPeriod).toEpochMilli(); int messageSize = 0; - try (DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, 10 * MB, 1 * GB, Duration.ofSeconds(1), Duration.ofSeconds(1)) + try (DeadLetterQueueWriter writeManager = newBuilder(dir) .retentionTime(retainedPeriod) .clock(fakeClock) .build()) { @@ -136,8 +142,7 @@ public void testRemovesOlderSegmentsWhenWritesIntoDLQContainingExpiredSegments() int messageSize = 0; final Duration retention = Duration.ofDays(2); - try (DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, 10 * MB, 1 * GB, Duration.ofSeconds(1), Duration.ofSeconds(1)) + try (DeadLetterQueueWriter writeManager = newBuilder(dir) .retentionTime(retention) .clock(fakeClock) .build()) { @@ -179,8 +184,7 @@ public void testRemoveMultipleOldestSegmentsWhenRetainedAgeIsExceeded() throws I int messageSize = 0; final Duration retention = Duration.ofDays(2); - try (DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, 10 * MB, 1 * GB, Duration.ofSeconds(1), Duration.ofSeconds(1)) + try (DeadLetterQueueWriter writeManager = newBuilder(dir) .retentionTime(retention) .clock(fakeClock) .build()) { @@ -272,8 +276,7 @@ public void testDLQWriterFlusherRemovesExpiredSegmentWhenCurrentWriterIsStale() final ManualAdvanceClock fakeClock = new ManualAdvanceClock(ZoneId.of("Europe/Rome")); Duration retainedPeriod = Duration.ofDays(1); - try (DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, 10 * MB, 1 * GB, Duration.ofSeconds(1), Duration.ofSeconds(1)) + try (DeadLetterQueueWriter writeManager = newBuilder(dir) .retentionTime(retainedPeriod) .clock(fakeClock) .build()) { diff --git a/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueWriterTest.java b/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueWriterTest.java index 202694f097..e6f7d9e64e 100644 --- a/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueWriterTest.java +++ b/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueWriterTest.java @@ -72,11 +72,26 @@ public void setUp() throws Exception { private static long EMPTY_DLQ = VERSION_SIZE; // Only the version field has been written + private static final long DEFAULT_MAX_SEGMENT_SIZE = 1_000; + private static final long DEFAULT_MAX_QUEUE_SIZE = 100_000; + + private static DeadLetterQueueWriter.Builder newBuilder(final Path queuePath) { + return newBuilder(queuePath, DEFAULT_MAX_SEGMENT_SIZE, DEFAULT_MAX_QUEUE_SIZE); + } + + private static DeadLetterQueueWriter.Builder newBuilder(final Path queuePath, final long maxSegmentSize, final long maxQueueSize) { + return newBuilder(queuePath, maxSegmentSize, maxQueueSize, Duration.ofSeconds(1)); + } + + private static DeadLetterQueueWriter.Builder newBuilder(final Path queuePath, final long maxSegmentSize, final long maxQueueSize, final Duration flushInterval) { + return DeadLetterQueueWriter + .newBuilder(queuePath, maxSegmentSize, maxQueueSize, flushInterval, Duration.ofSeconds(1)); + } + @Test public void testLockFileManagement() throws Exception { Path lockFile = dir.resolve(".lock"); - DeadLetterQueueWriter writer = DeadLetterQueueWriter - .newBuilder(dir, 1_000, 100_000, Duration.ofSeconds(1), Duration.ofSeconds(1)) + DeadLetterQueueWriter writer = newBuilder(dir) .build(); assertTrue(Files.exists(lockFile)); writer.close(); @@ -85,12 +100,10 @@ public void testLockFileManagement() throws Exception { @Test public void testFileLocking() throws Exception { - DeadLetterQueueWriter writer = DeadLetterQueueWriter - .newBuilder(dir, 1_000, 100_000, Duration.ofSeconds(1), Duration.ofSeconds(1)) + DeadLetterQueueWriter writer = newBuilder(dir) .build(); try { - DeadLetterQueueWriter - .newBuilder(dir, 100, 1_000, Duration.ofSeconds(1), Duration.ofSeconds(1)) + newBuilder(dir, 100, 1_000) .build(); fail(); } catch (LockException e) { @@ -103,8 +116,7 @@ public void testFileLocking() throws Exception { public void testUncleanCloseOfPreviousWriter() throws Exception { Path lockFilePath = dir.resolve(".lock"); boolean created = lockFilePath.toFile().createNewFile(); - DeadLetterQueueWriter writer = DeadLetterQueueWriter - .newBuilder(dir, 1_000, 100_000, Duration.ofSeconds(1), Duration.ofSeconds(1)) + DeadLetterQueueWriter writer = newBuilder(dir) .build(); FileChannel channel = FileChannel.open(lockFilePath, StandardOpenOption.WRITE); @@ -120,8 +132,7 @@ public void testUncleanCloseOfPreviousWriter() throws Exception { @Test public void testWrite() throws Exception { - DeadLetterQueueWriter writer = DeadLetterQueueWriter - .newBuilder(dir, 1_000, 100_000, Duration.ofSeconds(1), Duration.ofSeconds(1)) + DeadLetterQueueWriter writer = newBuilder(dir) .build(); DLQEntry entry = new DLQEntry(new Event(), "type", "id", "reason"); writer.writeEntry(entry); @@ -135,8 +146,7 @@ public void testDoesNotWriteMessagesAlreadyRoutedThroughDLQ() throws Exception { DLQEntry entry = new DLQEntry(new Event(), "type", "id", "reason"); DLQEntry dlqEntry = new DLQEntry(dlqEvent, "type", "id", "reason"); - try (DeadLetterQueueWriter writer = DeadLetterQueueWriter - .newBuilder(dir, 1_000, 100_000, Duration.ofSeconds(1), Duration.ofSeconds(1)) + try (DeadLetterQueueWriter writer = newBuilder(dir) .build()) { writer.writeEntry(entry); long dlqLengthAfterEvent = dlqLength(); @@ -156,8 +166,7 @@ public void testDoesNotWriteBeyondLimit() throws Exception { long MAX_QUEUE_LENGTH = payloadLength * MESSAGE_COUNT; - try (DeadLetterQueueWriter writer = DeadLetterQueueWriter - .newBuilder(dir, payloadLength, MAX_QUEUE_LENGTH, Duration.ofSeconds(1), Duration.ofSeconds(1)) + try (DeadLetterQueueWriter writer = newBuilder(dir, payloadLength, MAX_QUEUE_LENGTH) .build()) { for (int i = 0; i < MESSAGE_COUNT; i++) @@ -174,8 +183,7 @@ public void testDoesNotWriteBeyondLimit() throws Exception { @Test public void testSlowFlush() throws Exception { - try (DeadLetterQueueWriter writer = DeadLetterQueueWriter - .newBuilder(dir, 1_000, 1_000_000, Duration.ofSeconds(1), Duration.ofSeconds(1)) + try (DeadLetterQueueWriter writer = newBuilder(dir, 1_000, 1_000_000) .build()) { DLQEntry entry = new DLQEntry(new Event(), "type", "id", "1"); writer.writeEntry(entry); @@ -196,8 +204,7 @@ public void testSlowFlush() throws Exception { @Test public void testNotFlushed() throws Exception { - try (DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, BLOCK_SIZE, 1_000_000_000, Duration.ofSeconds(5), Duration.ofSeconds(1)) + try (DeadLetterQueueWriter writeManager = newBuilder(dir, BLOCK_SIZE, 1_000_000_000, Duration.ofSeconds(5)) .build()) { for (int i = 0; i < 4; i++) { DLQEntry entry = new DLQEntry(new Event(), "type", "id", "1"); @@ -219,8 +226,7 @@ public void testNotFlushed() throws Exception { @Test public void testCloseFlush() throws Exception { - try (DeadLetterQueueWriter writer = DeadLetterQueueWriter - .newBuilder(dir, 1_000, 1_000_000, Duration.ofHours(1), Duration.ofSeconds(1)) + try (DeadLetterQueueWriter writer = newBuilder(dir, 1_000, 1_000_000, Duration.ofHours(1)) .build()) { DLQEntry entry = new DLQEntry(new Event(), "type", "id", "1"); writer.writeEntry(entry); @@ -255,8 +261,7 @@ public void testRemoveOldestSegmentWhenRetainedSizeIsExceededAndDropOlderModeIsE long startTime = System.currentTimeMillis(); int messageSize = 0; - try (DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, 10 * MB, 20 * MB, Duration.ofSeconds(1), Duration.ofSeconds(1)) + try (DeadLetterQueueWriter writeManager = newBuilder(dir, 10 * MB, 20 * MB) .build()) { // 320 generates 10 Mb of data @@ -287,8 +292,7 @@ public void testRemoveOldestSegmentWhenRetainedSizeIsExceededAndDropOlderModeIsE final long prevQueueSize; final long beheadedQueueSize; long droppedEvent; - try (DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, 10 * MB, 20 * MB, Duration.ofSeconds(1), Duration.ofSeconds(1)) + try (DeadLetterQueueWriter writeManager = newBuilder(dir, 10 * MB, 20 * MB) .storageType(QueueStorageType.DROP_OLDER) .build()) { prevQueueSize = writeManager.getCurrentQueueSize(); @@ -325,8 +329,7 @@ public void testRemoveOldestSegmentWhenRetainedSizeIsExceededAndDropOlderModeIsE @Test public void testRemoveSegmentsOrder() throws IOException { - try (DeadLetterQueueWriter sut = DeadLetterQueueWriter - .newBuilder(dir, 10 * MB, 20 * MB, Duration.ofSeconds(1), Duration.ofSeconds(1)) + try (DeadLetterQueueWriter sut = newBuilder(dir, 10 * MB, 20 * MB) .build()) { // create some segments files Files.createFile(dir.resolve("9.log")); @@ -495,8 +498,7 @@ public void testDropEventCountCorrectlyNotEnqueuedEvents() throws IOException, I Event bigEvent = DeadLetterQueueReaderTest.createEventWithConstantSerializationOverhead(Collections.emptyMap()); bigEvent.setField("message", DeadLetterQueueReaderTest.generateMessageContent(2 * BLOCK_SIZE)); - try (DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, 10 * MB, 20 * MB, Duration.ofSeconds(1), Duration.ofSeconds(1)) + try (DeadLetterQueueWriter writeManager = newBuilder(dir, 10 * MB, 20 * MB) .build()) { // enqueue a record with size smaller than BLOCK_SIZE DLQEntry entry = new DLQEntry(blockAlmostFullEvent, "", "", "00001", DeadLetterQueueReaderTest.constantSerializationLengthTimestamp(System.currentTimeMillis())); @@ -513,8 +515,7 @@ public void testDropEventCountCorrectlyNotEnqueuedEvents() throws IOException, I // fill the queue to push out the segment with the 2 previous events Event event = DeadLetterQueueReaderTest.createEventWithConstantSerializationOverhead(Collections.emptyMap()); event.setField("message", DeadLetterQueueReaderTest.generateMessageContent(32500)); - try (DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, 10 * MB, 20 * MB, Duration.ofSeconds(1), Duration.ofSeconds(1)) + try (DeadLetterQueueWriter writeManager = newBuilder(dir, 10 * MB, 20 * MB) .storageType(QueueStorageType.DROP_NEWER) .build()) { @@ -543,16 +544,14 @@ public void testDropEventCountCorrectlyNotEnqueuedEvents() throws IOException, I public void testInitializeWriterWith1ByteEntry() throws Exception { Files.write(dir.resolve("1.log"), "1".getBytes()); - DeadLetterQueueWriter writer = DeadLetterQueueWriter - .newBuilder(dir, 1_000, 100_000, Duration.ofSeconds(1), Duration.ofSeconds(1)) + DeadLetterQueueWriter writer = newBuilder(dir) .build(); writer.close(); } @Test public void givenDLQWriterCreatedSomeSegmentsWhenReaderWithCleanConsumedNotifyTheDeletionOfSomeThenWriterUpdatesItsMetricsSize() throws IOException, InterruptedException { - try (DeadLetterQueueWriter writer = DeadLetterQueueWriter - .newBuilder(dir, 1 * MB, 100 * MB, Duration.ofSeconds(1), Duration.ofSeconds(1)) + try (DeadLetterQueueWriter writer = newBuilder(dir, 1 * MB, 100 * MB) .build()) { // fill at least 3 segments @@ -617,22 +616,22 @@ private long countDlqSegments(Path dir) throws IOException { } @Test - public void givenFlushIntervalGreatherThanMinimumWhenNormalizedThenRemainsUnmodified() { - DeadLetterQueueWriter.Builder builder = DeadLetterQueueWriter.newBuilder(dir, 1_000, 100_000, Duration.ofSeconds(5), Duration.ofSeconds(1)); + public void givenFlushIntervalGreaterThanMinimumWhenNormalizedThenRemainsUnmodified() { + DeadLetterQueueWriter.Builder builder = newBuilder(dir); Duration result = builder.normalizeFlushInterval(Duration.ofSeconds(10)); assertEquals("Valid flush interval should remain unchanged", Duration.ofSeconds(10), result); } @Test public void givenFlushIntervalBelowTheMinimumWhenNormalizedThenIsClampedToMinimum() { - DeadLetterQueueWriter.Builder builder = DeadLetterQueueWriter.newBuilder(dir, 1_000, 100_000, Duration.ofSeconds(5), Duration.ofSeconds(1)); + DeadLetterQueueWriter.Builder builder = newBuilder(dir); Duration result = builder.normalizeFlushInterval(Duration.ofMillis(100)); assertEquals("Flush interval below 1s should be clamped to 1s", Duration.ofSeconds(1), result); } @Test public void testNormalizeFlushCheckIntervalWithinLimits() { - DeadLetterQueueWriter.Builder builder = DeadLetterQueueWriter.newBuilder(dir, 1_000, 100_000, Duration.ofSeconds(5), Duration.ofSeconds(1)); + DeadLetterQueueWriter.Builder builder = newBuilder(dir); Duration flushInterval = Duration.ofSeconds(5); Duration flushCheckInterval = Duration.ofSeconds(2); Duration result = builder.normalizeFlushCheckInterval(flushCheckInterval, flushInterval); @@ -640,8 +639,8 @@ public void testNormalizeFlushCheckIntervalWithinLimits() { } @Test - public void testNormalizeFlushCheckIntervalBelowMinimum() { - DeadLetterQueueWriter.Builder builder = DeadLetterQueueWriter.newBuilder(dir, 1_000, 100_000, Duration.ofSeconds(5), Duration.ofSeconds(1)); + public void givenFlushCheckIntervalBelowMinimumWhenNormalizedThenClampedToMinimum() { + DeadLetterQueueWriter.Builder builder = newBuilder(dir); Duration flushInterval = Duration.ofSeconds(5); Duration belowMinimum = Duration.ofMillis(500); Duration result = builder.normalizeFlushCheckInterval(belowMinimum, flushInterval); @@ -649,8 +648,8 @@ public void testNormalizeFlushCheckIntervalBelowMinimum() { } @Test - public void testNormalizeFlushCheckIntervalExceedsFlushInterval() { - DeadLetterQueueWriter.Builder builder = DeadLetterQueueWriter.newBuilder(dir, 1_000, 100_000, Duration.ofSeconds(5), Duration.ofSeconds(1)); + public void givenFlushCheckIntervalExceedsFlushIntervalWhenNormalizedThenClampedToFlushInterval() { + DeadLetterQueueWriter.Builder builder = newBuilder(dir); Duration flushInterval = Duration.ofSeconds(3); Duration aboveFlushInterval = Duration.ofSeconds(5); Duration result = builder.normalizeFlushCheckInterval(aboveFlushInterval, flushInterval); @@ -658,8 +657,8 @@ public void testNormalizeFlushCheckIntervalExceedsFlushInterval() { } @Test - public void testNormalizeFlushCheckIntervalJustBelowFlushInterval() { - DeadLetterQueueWriter.Builder builder = DeadLetterQueueWriter.newBuilder(dir, 1_000, 100_000, Duration.ofSeconds(5), Duration.ofSeconds(1)); + public void givenFlushCheckIntervalJustBelowFlushIntervalWhenNormalizedThenAccepted() { + DeadLetterQueueWriter.Builder builder = newBuilder(dir); Duration flushInterval = Duration.ofSeconds(5); Duration justBelowFlushInterval = Duration.ofMillis(4900); Duration result = builder.normalizeFlushCheckInterval(justBelowFlushInterval, flushInterval);