diff --git a/config/logstash.yml b/config/logstash.yml index d2903c47fd..05615b9402 100644 --- a/config/logstash.yml +++ b/config/logstash.yml @@ -271,6 +271,14 @@ # # dead_letter_queue.flush_interval: 5000 +# If using dead_letter_queue.enable: true, the interval in milliseconds that the DLQ scheduler checks for stale segments +# to be flushed. A smaller value ensures faster segment rotation at the cost of CPU with more frequent scheduler runs. +# A larger value reduces scheduler overhead but may delay segment sealing. +# Minimum value is 1000 and cannot be greater than dead_letter_queue.flush_interval. +# Default is 1000. +# +# dead_letter_queue.flush_check_interval: 1000 + # If using dead_letter_queue.enable: true, controls which entries should be dropped to avoid exceeding the size limit. # Set the value to `drop_newer` (default) to stop accepting new events that would push the DLQ size over the limit. # Set the value to `drop_older` to remove queue pages containing the oldest events to make space for new ones. diff --git a/docs/reference/dead-letter-queues.md b/docs/reference/dead-letter-queues.md index c1e35afb65..7752e37d13 100644 --- a/docs/reference/dead-letter-queues.md +++ b/docs/reference/dead-letter-queues.md @@ -78,8 +78,14 @@ Note that this value cannot be set to lower than 1000ms. dead_letter_queue.flush_interval: 5000 ``` +Stale segments files are periodically checked if they need to be flushed. This period is controlled by the `dead_letter_queue.flush_check_interval` setting. This setting is in milliseconds, and defaults to 1000ms. A smaller value ensures faster segment rotation when infrequent writes occur, at the cost of CPU consumption with more frequent segment checks execution. A larger value reduces checks overhead but delays segment sealing, for the worst case it will be `dead_letter_queue.flush_interval` + `dead_letter_queue.flush_check_interval`. This value cannot be set to lower than 1000ms. + +```yaml +dead_letter_queue.flush_check_interval: 1000 +``` + ::::{note} -You may not use the same `dead_letter_queue` path for two different Logstash instances. +You cannot use the same `dead_letter_queue` path for two different Logstash instances. :::: diff --git a/logstash-core/lib/logstash/environment.rb b/logstash-core/lib/logstash/environment.rb index 5ceeca979f..c5bf204035 100644 --- a/logstash-core/lib/logstash/environment.rb +++ b/logstash-core/lib/logstash/environment.rb @@ -102,6 +102,7 @@ def self.as_java_range(r) Setting::BooleanSetting.new("dead_letter_queue.enable", false), Setting::BytesSetting.new("dead_letter_queue.max_bytes", "1024mb"), Setting::NumericSetting.new("dead_letter_queue.flush_interval", 5000), + Setting::NumericSetting.new("dead_letter_queue.flush_check_interval", 1000), Setting::StringSetting.new("dead_letter_queue.storage_policy", "drop_newer", true, ["drop_newer", "drop_older"]), Setting::NullableStringSetting.new("dead_letter_queue.retain.age"), # example 5d Setting::TimeValueSetting.new("slowlog.threshold.warn", "-1"), diff --git a/logstash-core/lib/logstash/settings.rb b/logstash-core/lib/logstash/settings.rb index e706792027..5e7021cde9 100644 --- a/logstash-core/lib/logstash/settings.rb +++ b/logstash-core/lib/logstash/settings.rb @@ -46,6 +46,7 @@ def self.included(base) "config.string", "dead_letter_queue.enable", "dead_letter_queue.flush_interval", + "dead_letter_queue.flush_check_interval", "dead_letter_queue.max_bytes", "dead_letter_queue.storage_policy", "dead_letter_queue.retain.age", diff --git a/logstash-core/src/main/java/org/logstash/common/DeadLetterQueueFactory.java b/logstash-core/src/main/java/org/logstash/common/DeadLetterQueueFactory.java index 16fbc0c5b2..2b1a476a96 100644 --- a/logstash-core/src/main/java/org/logstash/common/DeadLetterQueueFactory.java +++ b/logstash-core/src/main/java/org/logstash/common/DeadLetterQueueFactory.java @@ -76,15 +76,16 @@ private DeadLetterQueueFactory() { * @param maxQueueSize Maximum size of the dead letter queue (in bytes). No entries will be written * that would make the size of this dlq greater than this value * @param flushInterval Maximum duration between flushes of dead letter queue files if no data is sent. + * @param flushCheckInterval The interval between scheduler checks for stale segments. * @param storageType overwriting type in case of queue full: drop_older or drop_newer. * @return write manager for the specific id's dead-letter-queue context */ - public static DeadLetterQueueWriter getWriter(String id, String dlqPath, long maxQueueSize, Duration flushInterval, QueueStorageType storageType) { - return REGISTRY.computeIfAbsent(id, key -> newWriter(key, dlqPath, maxQueueSize, flushInterval, storageType)); + public static DeadLetterQueueWriter getWriter(String id, String dlqPath, long maxQueueSize, Duration flushInterval, Duration flushCheckInterval, QueueStorageType storageType) { + return REGISTRY.computeIfAbsent(id, key -> newWriter(key, dlqPath, maxQueueSize, flushInterval, flushCheckInterval, storageType)); } /** - * Like {@link #getWriter(String, String, long, Duration, QueueStorageType)} but also setting the age duration + * Like {@link #getWriter(String, String, long, Duration, Duration, QueueStorageType)} but also setting the age duration * of the segments. * * @param id The identifier context for this dlq manager @@ -93,12 +94,13 @@ public static DeadLetterQueueWriter getWriter(String id, String dlqPath, long ma * @param maxQueueSize Maximum size of the dead letter queue (in bytes). No entries will be written * that would make the size of this dlq greater than this value * @param flushInterval Maximum duration between flushes of dead letter queue files if no data is sent. + * @param flushCheckInterval The interval between scheduler checks for stale segments. * @param storageType overwriting type in case of queue full: drop_older or drop_newer. * @param age the period that DLQ events should be considered as valid, before automatic removal. * @return write manager for the specific id's dead-letter-queue context * */ - public static DeadLetterQueueWriter getWriter(String id, String dlqPath, long maxQueueSize, Duration flushInterval, QueueStorageType storageType, Duration age) { - return REGISTRY.computeIfAbsent(id, key -> newWriter(key, dlqPath, maxQueueSize, flushInterval, storageType, age)); + public static DeadLetterQueueWriter getWriter(String id, String dlqPath, long maxQueueSize, Duration flushInterval, Duration flushCheckInterval, QueueStorageType storageType, Duration age) { + return REGISTRY.computeIfAbsent(id, key -> newWriter(key, dlqPath, maxQueueSize, flushInterval, flushCheckInterval, storageType, age)); } public static DeadLetterQueueWriter release(String id) { @@ -106,10 +108,11 @@ public static DeadLetterQueueWriter release(String id) { } private static DeadLetterQueueWriter newWriter(final String id, final String dlqPath, final long maxQueueSize, - final Duration flushInterval, final QueueStorageType storageType) { + final Duration flushInterval, final Duration flushCheckInterval, final QueueStorageType storageType) { try { return DeadLetterQueueWriter - .newBuilder(Paths.get(dlqPath, id), MAX_SEGMENT_SIZE_BYTES, maxQueueSize, flushInterval) + .newBuilder(Paths.get(dlqPath, id), MAX_SEGMENT_SIZE_BYTES, maxQueueSize, flushInterval, flushCheckInterval) + .pipelineId(id) .storageType(storageType) .build(); } catch (IOException e) { @@ -119,11 +122,12 @@ private static DeadLetterQueueWriter newWriter(final String id, final String dlq } private static DeadLetterQueueWriter newWriter(final String id, final String dlqPath, final long maxQueueSize, - final Duration flushInterval, final QueueStorageType storageType, + final Duration flushInterval, final Duration flushCheckInterval, final QueueStorageType storageType, final Duration age) { try { return DeadLetterQueueWriter - .newBuilder(Paths.get(dlqPath, id), MAX_SEGMENT_SIZE_BYTES, maxQueueSize, flushInterval) + .newBuilder(Paths.get(dlqPath, id), MAX_SEGMENT_SIZE_BYTES, maxQueueSize, flushInterval, flushCheckInterval) + .pipelineId(id) .storageType(storageType) .retentionTime(age) .build(); diff --git a/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueWriter.java b/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueWriter.java index 23c4e7e8f2..d4dfc3af67 100644 --- a/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueWriter.java +++ b/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueWriter.java @@ -124,6 +124,9 @@ public String toString() { } } + static final Duration MIN_FLUSH_PERIOD = Duration.ofMillis(1000); + static final Duration MIN_FLUSH_CHECK_INTERVAL = Duration.ofMillis(1000); + @VisibleForTesting static final String SEGMENT_FILE_PATTERN = "%d.log"; private static final Logger logger = LogManager.getLogger(DeadLetterQueueWriter.class); @@ -174,21 +177,25 @@ interface SchedulerService { private static class FixedRateScheduler implements SchedulerService { private final ScheduledExecutorService scheduledExecutor; + private final Duration flushCheckInterval; - FixedRateScheduler() { + FixedRateScheduler(final Duration flushCheckInterval, final String pipelineId) { + //Set the name with pipeline ID for better visibility + final String threadName = pipelineId != null ? "dlq-flush-check[" + pipelineId + "]" : "dlq-flush-check"; + + this.flushCheckInterval = flushCheckInterval; scheduledExecutor = Executors.newScheduledThreadPool(1, r -> { Thread t = new Thread(r); //Allow this thread to die when the JVM dies t.setDaemon(true); - //Set the name - t.setName("dlq-flush-check"); + t.setName(threadName); return t; }); } @Override public void repeatedAction(Runnable action) { - scheduledExecutor.scheduleAtFixedRate(action, 1L, 1L, TimeUnit.SECONDS); + scheduledExecutor.scheduleAtFixedRate(action, flushCheckInterval.toMillis(), flushCheckInterval.toMillis(), TimeUnit.MILLISECONDS); } @Override @@ -215,21 +222,24 @@ public static final class Builder { private final long maxSegmentSize; private final long maxQueueSize; private final Duration flushInterval; + private final Duration flushCheckInterval; private boolean startScheduledFlusher; private QueueStorageType storageType = QueueStorageType.DROP_NEWER; private Duration retentionTime = null; private Clock clock = Clock.systemDefaultZone(); private SchedulerService customSchedulerService = null; + private String pipelineId; - private Builder(Path queuePath, long maxSegmentSize, long maxQueueSize, Duration flushInterval) { - this(queuePath, maxSegmentSize, maxQueueSize, flushInterval, true); + private Builder(Path queuePath, long maxSegmentSize, long maxQueueSize, final Duration flushInterval, final Duration flushCheckInterval) { + this(queuePath, maxSegmentSize, maxQueueSize, flushInterval, flushCheckInterval, true); } - private Builder(Path queuePath, long maxSegmentSize, long maxQueueSize, Duration flushInterval, boolean startScheduledFlusher) { + private Builder(Path queuePath, long maxSegmentSize, long maxQueueSize, final Duration flushInterval, final Duration flushCheckInterval, boolean startScheduledFlusher) { this.queuePath = queuePath; this.maxSegmentSize = maxSegmentSize; this.maxQueueSize = maxQueueSize; this.flushInterval = flushInterval; + this.flushCheckInterval = flushCheckInterval; this.startScheduledFlusher = startScheduledFlusher; } @@ -243,6 +253,11 @@ public Builder retentionTime(Duration retentionTime) { return this; } + public Builder pipelineId(final String pipelineId) { + this.pipelineId = pipelineId; + return this; + } + @VisibleForTesting Builder clock(Clock clock) { this.clock = clock; @@ -259,29 +274,64 @@ public DeadLetterQueueWriter build() throws IOException { if (customSchedulerService != null && startScheduledFlusher) { throw new IllegalArgumentException("Both default scheduler and custom scheduler were defined, "); } + + final Duration normalizedFlushInterval = normalizeFlushInterval(flushInterval); + SchedulerService schedulerService; if (customSchedulerService != null) { schedulerService = customSchedulerService; } else { if (startScheduledFlusher) { - schedulerService = new FixedRateScheduler(); + final Duration normalizedFlushCheckInterval = normalizeFlushCheckInterval(flushCheckInterval, normalizedFlushInterval); + schedulerService = new FixedRateScheduler(normalizedFlushCheckInterval, pipelineId); } else { schedulerService = new NoopScheduler(); } } - return new DeadLetterQueueWriter(queuePath, maxSegmentSize, maxQueueSize, flushInterval, storageType, retentionTime, clock, schedulerService); + return new DeadLetterQueueWriter(queuePath, maxSegmentSize, maxQueueSize, normalizedFlushInterval, storageType, retentionTime, clock, schedulerService); + } + + @VisibleForTesting + Duration normalizeFlushInterval(final Duration flushInterval) { + if (!startScheduledFlusher) return flushInterval; + + Duration effectiveFlushInterval = flushInterval; + if (flushInterval.compareTo(MIN_FLUSH_PERIOD) < 0) { + logger.warn("dead_letter_queue.flush_interval ({} ms) is below the minimum of {} ms; using {} ms", + flushInterval.toMillis(), MIN_FLUSH_PERIOD.toMillis(), MIN_FLUSH_PERIOD.toMillis()); + effectiveFlushInterval = MIN_FLUSH_PERIOD; + } + return effectiveFlushInterval; + } + + @VisibleForTesting + Duration normalizeFlushCheckInterval(final Duration flushCheckInterval, final Duration effectiveFlushInterval) { + Duration effectiveFlushCheckInterval = flushCheckInterval; + // can't exceed flush interval + if (effectiveFlushCheckInterval.compareTo(effectiveFlushInterval) > 0) { + logger.warn("dead_letter_queue.flush_check_interval ({} ms) cannot be greater than dead_letter_queue.flush_interval ({} ms); using {} ms", + effectiveFlushCheckInterval.toMillis(), effectiveFlushInterval.toMillis(), effectiveFlushInterval.toMillis()); + effectiveFlushCheckInterval = effectiveFlushInterval; + } + // can't be less than 1s + if (effectiveFlushCheckInterval.compareTo(MIN_FLUSH_CHECK_INTERVAL) < 0) { + logger.warn("dead_letter_queue.flush_check_interval ({} ms) is below the minimum of {} ms; using {} ms for the flush check interval", + effectiveFlushCheckInterval.toMillis(), MIN_FLUSH_CHECK_INTERVAL.toMillis(), MIN_FLUSH_CHECK_INTERVAL.toMillis()); + effectiveFlushCheckInterval = MIN_FLUSH_CHECK_INTERVAL; + } + return effectiveFlushCheckInterval; } } public static Builder newBuilder(final Path queuePath, final long maxSegmentSize, final long maxQueueSize, - final Duration flushInterval) { - return new Builder(queuePath, maxSegmentSize, maxQueueSize, flushInterval); + final Duration flushInterval, final Duration flushCheckInterval) { + return new Builder(queuePath, maxSegmentSize, maxQueueSize, flushInterval, flushCheckInterval); } @VisibleForTesting static Builder newBuilderWithoutFlusher(final Path queuePath, final long maxSegmentSize, final long maxQueueSize) { - return new Builder(queuePath, maxSegmentSize, maxQueueSize, Duration.ZERO, false); + return new Builder(queuePath, maxSegmentSize, maxQueueSize, Duration.ZERO, Duration.ZERO, false); } private DeadLetterQueueWriter(final Path queuePath, final long maxSegmentSize, final long maxQueueSize, diff --git a/logstash-core/src/main/java/org/logstash/common/io/RecordIOWriter.java b/logstash-core/src/main/java/org/logstash/common/io/RecordIOWriter.java index 34d3b5024f..d529e97e34 100644 --- a/logstash-core/src/main/java/org/logstash/common/io/RecordIOWriter.java +++ b/logstash-core/src/main/java/org/logstash/common/io/RecordIOWriter.java @@ -173,7 +173,7 @@ public boolean hasWritten(){ return lastWrite != null; } - public boolean isStale(Duration flushPeriod){ + public boolean isStale(final Duration flushPeriod) { return hasWritten() && Instant.now().minus(flushPeriod).isAfter(lastWrite); } diff --git a/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java b/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java index 3572271064..e47d855692 100644 --- a/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java +++ b/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java @@ -429,15 +429,16 @@ private DeadLetterQueueWriter createDeadLetterQueueWriterFromSettings(ThreadCont String dlqPath = getSetting(context, "path.dead_letter_queue").asJavaString(); long dlqMaxBytes = org.jruby.RubyNumeric.num2long(getSetting(context, "dead_letter_queue.max_bytes").convertToInteger()); Duration dlqFlushInterval = Duration.ofMillis(org.jruby.RubyNumeric.num2long(getSetting(context, "dead_letter_queue.flush_interval").convertToInteger())); + Duration dlqFlushCheckInterval = Duration.ofMillis(org.jruby.RubyNumeric.num2long(getSetting(context, "dead_letter_queue.flush_check_interval").convertToInteger())); if (hasSetting(context, "dead_letter_queue.retain.age") && !getSetting(context, "dead_letter_queue.retain.age").isNil()) { // convert to Duration final Duration age = parseToDuration(getSetting(context, "dead_letter_queue.retain.age").convertToString().toString()); return DeadLetterQueueFactory.getWriter(pipelineId.asJavaString(), dlqPath, dlqMaxBytes, - dlqFlushInterval, storageType, age); + dlqFlushInterval, dlqFlushCheckInterval, storageType, age); } - return DeadLetterQueueFactory.getWriter(pipelineId.asJavaString(), dlqPath, dlqMaxBytes, dlqFlushInterval, storageType); + return DeadLetterQueueFactory.getWriter(pipelineId.asJavaString(), dlqPath, dlqMaxBytes, dlqFlushInterval, dlqFlushCheckInterval, storageType); } /** diff --git a/logstash-core/src/test/java/org/logstash/common/AbstractDeadLetterQueueWriterExtTest.java b/logstash-core/src/test/java/org/logstash/common/AbstractDeadLetterQueueWriterExtTest.java index 335dec971f..8e87bf7eb1 100644 --- a/logstash-core/src/test/java/org/logstash/common/AbstractDeadLetterQueueWriterExtTest.java +++ b/logstash-core/src/test/java/org/logstash/common/AbstractDeadLetterQueueWriterExtTest.java @@ -60,7 +60,7 @@ private void writeAnEventIntoDLQ(Path dlqPath, String pluginId, String pluginTyp RubyString id = RubyString.newString(RubyUtil.RUBY, pluginId); RubyString classConfigName = RubyString.newString(RubyUtil.RUBY, pluginType); - final DeadLetterQueueWriter javaDlqWriter = DeadLetterQueueFactory.getWriter(dlqName, dlqPath.toString(), 1024 * 1024, Duration.ofHours(1), QueueStorageType.DROP_NEWER); + final DeadLetterQueueWriter javaDlqWriter = DeadLetterQueueFactory.getWriter(dlqName, dlqPath.toString(), 1024 * 1024, Duration.ofHours(1), Duration.ofSeconds(1), QueueStorageType.DROP_NEWER); IRubyObject dlqWriter = JavaUtil.convertJavaToUsableRubyObject(context.runtime, javaDlqWriter); final AbstractDeadLetterQueueWriterExt dlqWriterForInstance = new AbstractDeadLetterQueueWriterExt.PluginDeadLetterQueueWriterExt( diff --git a/logstash-core/src/test/java/org/logstash/common/DeadLetterQueueFactoryTest.java b/logstash-core/src/test/java/org/logstash/common/DeadLetterQueueFactoryTest.java index cf85824d7a..6802d97849 100644 --- a/logstash-core/src/test/java/org/logstash/common/DeadLetterQueueFactoryTest.java +++ b/logstash-core/src/test/java/org/logstash/common/DeadLetterQueueFactoryTest.java @@ -69,9 +69,9 @@ public void setUp() throws Exception { public void testSameBeforeRelease() throws IOException { try { Path pipelineA = dir.resolve(PIPELINE_NAME); - DeadLetterQueueWriter writer = DeadLetterQueueFactory.getWriter(PIPELINE_NAME, pipelineA.toString(), 10000, Duration.ofSeconds(1), QueueStorageType.DROP_NEWER); + DeadLetterQueueWriter writer = DeadLetterQueueFactory.getWriter(PIPELINE_NAME, pipelineA.toString(), 10000, Duration.ofSeconds(1), Duration.ofSeconds(1), QueueStorageType.DROP_NEWER); assertTrue(writer.isOpen()); - DeadLetterQueueWriter writer2 = DeadLetterQueueFactory.getWriter(PIPELINE_NAME, pipelineA.toString(), 10000, Duration.ofSeconds(1), QueueStorageType.DROP_NEWER); + DeadLetterQueueWriter writer2 = DeadLetterQueueFactory.getWriter(PIPELINE_NAME, pipelineA.toString(), 10000, Duration.ofSeconds(1), Duration.ofSeconds(1), QueueStorageType.DROP_NEWER); assertSame(writer, writer2); writer.close(); } finally { @@ -83,11 +83,11 @@ public void testSameBeforeRelease() throws IOException { public void testOpenableAfterRelease() throws IOException { try { Path pipelineA = dir.resolve(PIPELINE_NAME); - DeadLetterQueueWriter writer = DeadLetterQueueFactory.getWriter(PIPELINE_NAME, pipelineA.toString(), 10000, Duration.ofSeconds(1), QueueStorageType.DROP_NEWER); + DeadLetterQueueWriter writer = DeadLetterQueueFactory.getWriter(PIPELINE_NAME, pipelineA.toString(), 10000, Duration.ofSeconds(1), Duration.ofSeconds(1), QueueStorageType.DROP_NEWER); assertTrue(writer.isOpen()); writer.close(); DeadLetterQueueFactory.release(PIPELINE_NAME); - writer = DeadLetterQueueFactory.getWriter(PIPELINE_NAME, pipelineA.toString(), 10000, Duration.ofSeconds(1), QueueStorageType.DROP_NEWER); + writer = DeadLetterQueueFactory.getWriter(PIPELINE_NAME, pipelineA.toString(), 10000, Duration.ofSeconds(1), Duration.ofSeconds(1), QueueStorageType.DROP_NEWER); assertTrue(writer.isOpen()); writer.close(); }finally{ diff --git a/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueReaderTest.java b/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueReaderTest.java index da8ae7e91e..629285378f 100644 --- a/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueReaderTest.java +++ b/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueReaderTest.java @@ -157,7 +157,7 @@ public void testRereadFinalBlock() throws Exception { long startTime = System.currentTimeMillis(); int messageSize = 0; try(DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, 10 * 1024 * 1024, defaultDlqSize, Duration.ofSeconds(1)) + .newBuilder(dir, 10 * 1024 * 1024, defaultDlqSize, Duration.ofSeconds(1), Duration.ofSeconds(1)) .build()) { for (int i = 0; i < 2; i++) { DLQEntry entry = new DLQEntry(event, "", "", String.valueOf(i), constantSerializationLengthTimestamp(startTime++)); @@ -214,7 +214,7 @@ private void writeSegmentSizeEntries(int count) throws IOException { DLQEntry templateEntry = new DLQEntry(event, "1", "1", String.valueOf(0), constantSerializationLengthTimestamp(startTime)); int size = templateEntry.serialize().length + RecordIOWriter.RECORD_HEADER_SIZE + VERSION_SIZE; try (DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, size, defaultDlqSize, Duration.ofSeconds(1)) + .newBuilder(dir, size, defaultDlqSize, Duration.ofSeconds(1), Duration.ofSeconds(1)) .build()) { for (int i = 1; i <= count; i++) { writeManager.writeEntry(new DLQEntry(event, "1", "1", String.valueOf(i), constantSerializationLengthTimestamp(startTime++))); @@ -248,7 +248,7 @@ public void testBlockBoundary() throws Exception { Timestamp timestamp = constantSerializationLengthTimestamp(); try (DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, 10 * 1024 * 1024, defaultDlqSize, Duration.ofSeconds(1)) + .newBuilder(dir, 10 * 1024 * 1024, defaultDlqSize, Duration.ofSeconds(1), Duration.ofSeconds(1)) .build()) { for (int i = 0; i < 2; i++) { DLQEntry entry = new DLQEntry(event, "", "", "", timestamp); @@ -273,7 +273,7 @@ public void testBlockBoundaryMultiple() throws Exception { long startTime = System.currentTimeMillis(); int messageSize = 0; try (DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, 10 * 1024 * 1024, defaultDlqSize, Duration.ofSeconds(1)) + .newBuilder(dir, 10 * 1024 * 1024, defaultDlqSize, Duration.ofSeconds(1), Duration.ofSeconds(1)) .build()) { for (int i = 1; i <= 5; i++) { DLQEntry entry = new DLQEntry(event, "", "", "", constantSerializationLengthTimestamp(startTime++)); @@ -298,7 +298,7 @@ public void testFlushAfterWriterClose() throws Exception { Timestamp timestamp = new Timestamp(); try (DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, BLOCK_SIZE, defaultDlqSize, Duration.ofSeconds(1)) + .newBuilder(dir, BLOCK_SIZE, defaultDlqSize, Duration.ofSeconds(1), Duration.ofSeconds(1)) .build()) { for (int i = 0; i < 6; i++) { DLQEntry entry = new DLQEntry(event, "", "", Integer.toString(i), timestamp); @@ -320,7 +320,7 @@ public void testFlushAfterSegmentComplete() throws Exception { event.setField("T", generateMessageContent(PAD_FOR_BLOCK_SIZE_EVENT)); Timestamp timestamp = new Timestamp(); try (DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, BLOCK_SIZE * EVENTS_BEFORE_FLUSH, defaultDlqSize, Duration.ofHours(1)) + .newBuilder(dir, BLOCK_SIZE * EVENTS_BEFORE_FLUSH, defaultDlqSize, Duration.ofHours(1), Duration.ofSeconds(1)) .build()) { for (int i = 1; i < EVENTS_BEFORE_FLUSH; i++) { DLQEntry entry = new DLQEntry(event, "", "", Integer.toString(i), timestamp); @@ -355,7 +355,7 @@ public void testMultiFlushAfterSegmentComplete() throws Exception { Timestamp timestamp = new Timestamp(); try (DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, BLOCK_SIZE * eventsInSegment, defaultDlqSize, Duration.ofHours(1)) + .newBuilder(dir, BLOCK_SIZE * eventsInSegment, defaultDlqSize, Duration.ofHours(1), Duration.ofSeconds(1)) .build()) { for (int i = 1; i < totalEventsToWrite; i++) { DLQEntry entry = new DLQEntry(event, "", "", Integer.toString(i), timestamp); @@ -398,7 +398,7 @@ public void testFlushAfterDelay() throws Exception { System.out.println("events per block= " + eventsPerBlock); try(DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, BLOCK_SIZE, defaultDlqSize, Duration.ofSeconds(2)) + .newBuilder(dir, BLOCK_SIZE, defaultDlqSize, Duration.ofSeconds(2), Duration.ofSeconds(1)) .build()) { for (int i = 1; i < eventsToWrite; i++) { DLQEntry entry = new DLQEntry(event, "", "", Integer.toString(i), timestamp); @@ -432,7 +432,7 @@ public void testBlockAndSegmentBoundary() throws Exception { Timestamp timestamp = constantSerializationLengthTimestamp(); try(DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, BLOCK_SIZE, defaultDlqSize, Duration.ofSeconds(1)) + .newBuilder(dir, BLOCK_SIZE, defaultDlqSize, Duration.ofSeconds(1), Duration.ofSeconds(1)) .build()) { for (int i = 0; i < 2; i++) { DLQEntry entry = new DLQEntry(event, "", "", "", timestamp); @@ -455,7 +455,7 @@ public void testWriteReadRandomEventSize() throws Exception { long startTime = System.currentTimeMillis(); try(DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, 10 * 1024 * 1024, defaultDlqSize, Duration.ofSeconds(1)) + .newBuilder(dir, 10 * 1024 * 1024, defaultDlqSize, Duration.ofSeconds(1), Duration.ofSeconds(1)) .build()) { for (int i = 0; i < eventCount; i++) { event.setField("message", generateMessageContent((int)(Math.random() * (maxEventSize)))); @@ -575,7 +575,7 @@ public void testConcurrentWriteReadRandomEventSize() throws Exception { final Event event = new Event(); long startTime = System.currentTimeMillis(); try (DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, 10 * 1024 * 1024, defaultDlqSize, Duration.ofSeconds(10)) + .newBuilder(dir, 10 * 1024 * 1024, defaultDlqSize, Duration.ofSeconds(10), Duration.ofSeconds(1)) .build()) { for (int i = 0; i < eventCount; i++) { event.setField( @@ -920,7 +920,7 @@ private void seekReadAndVerify(final Timestamp seekTarget, final String expected private void writeEntries(final Event event, int offset, final int numberOfEvents, long startTime) throws IOException { try (DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, 10 * 1024 * 1024, defaultDlqSize, Duration.ofSeconds(1)) + .newBuilder(dir, 10 * 1024 * 1024, defaultDlqSize, Duration.ofSeconds(1), Duration.ofSeconds(1)) .build()) { for (int i = offset; i <= offset + numberOfEvents; i++) { DLQEntry entry = new DLQEntry(event, "foo", "bar", String.valueOf(i), new Timestamp(startTime++)); @@ -943,7 +943,7 @@ private int prepareFilledSegmentFiles(int segments, long start) throws IOExcepti final int maxSegmentSize = 10 * MB; final int loopPerSegment = (int) Math.floor((maxSegmentSize - 1.0) / BLOCK_SIZE); try (DeadLetterQueueWriter writer = DeadLetterQueueWriter - .newBuilder(dir, maxSegmentSize, defaultDlqSize, Duration.ofSeconds(1)) + .newBuilder(dir, maxSegmentSize, defaultDlqSize, Duration.ofSeconds(1), Duration.ofSeconds(1)) .build()) { final int loops = loopPerSegment * segments; for (int i = 0; i < loops; i++) { diff --git a/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueWriterAgeRetentionTest.java b/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueWriterAgeRetentionTest.java index 2f6e6161c2..95d851ce04 100644 --- a/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueWriterAgeRetentionTest.java +++ b/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueWriterAgeRetentionTest.java @@ -66,6 +66,9 @@ void executeAction() { private SynchronizedScheduledService synchScheduler; + private static final long MAX_SEGMENT_SIZE = 10 * MB; + private static final long MAX_QUEUE_SIZE = 1 * GB; + @Before public void setUp() throws Exception { dir = temporaryFolder.newFolder().toPath(); @@ -73,6 +76,11 @@ public void setUp() throws Exception { synchScheduler = new SynchronizedScheduledService(); } + private static DeadLetterQueueWriter.Builder newBuilder(final Path queuePath) { + return DeadLetterQueueWriter + .newBuilder(queuePath, MAX_SEGMENT_SIZE, MAX_QUEUE_SIZE, Duration.ofSeconds(1), Duration.ofSeconds(1)); + } + @Test public void testRemovesOlderSegmentsWhenWriteOnReopenedDLQContainingExpiredSegments() throws IOException { final Event event = DeadLetterQueueReaderTest.createEventWithConstantSerializationOverhead(Collections.emptyMap()); @@ -85,8 +93,7 @@ public void testRemovesOlderSegmentsWhenWriteOnReopenedDLQContainingExpiredSegme // Exercise final long prevQueueSize; final long beheadedQueueSize; - try (DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, 10 * MB, 1 * GB, Duration.ofSeconds(1)) + try (DeadLetterQueueWriter writeManager = newBuilder(dir) .retentionTime(Duration.ofDays(2)) .clock(fakeClock) .build()) { @@ -109,8 +116,7 @@ private void prepareDLQWithFirstSegmentOlderThanRetainPeriod(Event event, Clock final Duration littleMoreThanRetainedPeriod = retainedPeriod.plusMinutes(1); long startTime = fakeClock.instant().minus(littleMoreThanRetainedPeriod).toEpochMilli(); int messageSize = 0; - try (DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, 10 * MB, 1 * GB, Duration.ofSeconds(1)) + try (DeadLetterQueueWriter writeManager = newBuilder(dir) .retentionTime(retainedPeriod) .clock(fakeClock) .build()) { @@ -136,8 +142,7 @@ public void testRemovesOlderSegmentsWhenWritesIntoDLQContainingExpiredSegments() int messageSize = 0; final Duration retention = Duration.ofDays(2); - try (DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, 10 * MB, 1 * GB, Duration.ofSeconds(1)) + try (DeadLetterQueueWriter writeManager = newBuilder(dir) .retentionTime(retention) .clock(fakeClock) .build()) { @@ -179,8 +184,7 @@ public void testRemoveMultipleOldestSegmentsWhenRetainedAgeIsExceeded() throws I int messageSize = 0; final Duration retention = Duration.ofDays(2); - try (DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, 10 * MB, 1 * GB, Duration.ofSeconds(1)) + try (DeadLetterQueueWriter writeManager = newBuilder(dir) .retentionTime(retention) .clock(fakeClock) .build()) { @@ -272,9 +276,7 @@ public void testDLQWriterFlusherRemovesExpiredSegmentWhenCurrentWriterIsStale() final ManualAdvanceClock fakeClock = new ManualAdvanceClock(ZoneId.of("Europe/Rome")); Duration retainedPeriod = Duration.ofDays(1); - Duration flushInterval = Duration.ofSeconds(1); - try (DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, 10 * MB, 1 * GB, flushInterval) + try (DeadLetterQueueWriter writeManager = newBuilder(dir) .retentionTime(retainedPeriod) .clock(fakeClock) .build()) { diff --git a/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueWriterTest.java b/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueWriterTest.java index 1e507bcdfe..e6f7d9e64e 100644 --- a/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueWriterTest.java +++ b/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueWriterTest.java @@ -72,11 +72,26 @@ public void setUp() throws Exception { private static long EMPTY_DLQ = VERSION_SIZE; // Only the version field has been written + private static final long DEFAULT_MAX_SEGMENT_SIZE = 1_000; + private static final long DEFAULT_MAX_QUEUE_SIZE = 100_000; + + private static DeadLetterQueueWriter.Builder newBuilder(final Path queuePath) { + return newBuilder(queuePath, DEFAULT_MAX_SEGMENT_SIZE, DEFAULT_MAX_QUEUE_SIZE); + } + + private static DeadLetterQueueWriter.Builder newBuilder(final Path queuePath, final long maxSegmentSize, final long maxQueueSize) { + return newBuilder(queuePath, maxSegmentSize, maxQueueSize, Duration.ofSeconds(1)); + } + + private static DeadLetterQueueWriter.Builder newBuilder(final Path queuePath, final long maxSegmentSize, final long maxQueueSize, final Duration flushInterval) { + return DeadLetterQueueWriter + .newBuilder(queuePath, maxSegmentSize, maxQueueSize, flushInterval, Duration.ofSeconds(1)); + } + @Test public void testLockFileManagement() throws Exception { Path lockFile = dir.resolve(".lock"); - DeadLetterQueueWriter writer = DeadLetterQueueWriter - .newBuilder(dir, 1_000, 100_000, Duration.ofSeconds(1)) + DeadLetterQueueWriter writer = newBuilder(dir) .build(); assertTrue(Files.exists(lockFile)); writer.close(); @@ -85,12 +100,10 @@ public void testLockFileManagement() throws Exception { @Test public void testFileLocking() throws Exception { - DeadLetterQueueWriter writer = DeadLetterQueueWriter - .newBuilder(dir, 1_000, 100_000, Duration.ofSeconds(1)) + DeadLetterQueueWriter writer = newBuilder(dir) .build(); try { - DeadLetterQueueWriter - .newBuilder(dir, 100, 1_000, Duration.ofSeconds(1)) + newBuilder(dir, 100, 1_000) .build(); fail(); } catch (LockException e) { @@ -103,8 +116,7 @@ public void testFileLocking() throws Exception { public void testUncleanCloseOfPreviousWriter() throws Exception { Path lockFilePath = dir.resolve(".lock"); boolean created = lockFilePath.toFile().createNewFile(); - DeadLetterQueueWriter writer = DeadLetterQueueWriter - .newBuilder(dir, 1_000, 100_000, Duration.ofSeconds(1)) + DeadLetterQueueWriter writer = newBuilder(dir) .build(); FileChannel channel = FileChannel.open(lockFilePath, StandardOpenOption.WRITE); @@ -120,8 +132,7 @@ public void testUncleanCloseOfPreviousWriter() throws Exception { @Test public void testWrite() throws Exception { - DeadLetterQueueWriter writer = DeadLetterQueueWriter - .newBuilder(dir, 1_000, 100_000, Duration.ofSeconds(1)) + DeadLetterQueueWriter writer = newBuilder(dir) .build(); DLQEntry entry = new DLQEntry(new Event(), "type", "id", "reason"); writer.writeEntry(entry); @@ -135,8 +146,7 @@ public void testDoesNotWriteMessagesAlreadyRoutedThroughDLQ() throws Exception { DLQEntry entry = new DLQEntry(new Event(), "type", "id", "reason"); DLQEntry dlqEntry = new DLQEntry(dlqEvent, "type", "id", "reason"); - try (DeadLetterQueueWriter writer = DeadLetterQueueWriter - .newBuilder(dir, 1_000, 100_000, Duration.ofSeconds(1)) + try (DeadLetterQueueWriter writer = newBuilder(dir) .build()) { writer.writeEntry(entry); long dlqLengthAfterEvent = dlqLength(); @@ -156,8 +166,7 @@ public void testDoesNotWriteBeyondLimit() throws Exception { long MAX_QUEUE_LENGTH = payloadLength * MESSAGE_COUNT; - try (DeadLetterQueueWriter writer = DeadLetterQueueWriter - .newBuilder(dir, payloadLength, MAX_QUEUE_LENGTH, Duration.ofSeconds(1)) + try (DeadLetterQueueWriter writer = newBuilder(dir, payloadLength, MAX_QUEUE_LENGTH) .build()) { for (int i = 0; i < MESSAGE_COUNT; i++) @@ -174,8 +183,7 @@ public void testDoesNotWriteBeyondLimit() throws Exception { @Test public void testSlowFlush() throws Exception { - try (DeadLetterQueueWriter writer = DeadLetterQueueWriter - .newBuilder(dir, 1_000, 1_000_000, Duration.ofSeconds(1)) + try (DeadLetterQueueWriter writer = newBuilder(dir, 1_000, 1_000_000) .build()) { DLQEntry entry = new DLQEntry(new Event(), "type", "id", "1"); writer.writeEntry(entry); @@ -196,8 +204,7 @@ public void testSlowFlush() throws Exception { @Test public void testNotFlushed() throws Exception { - try (DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, BLOCK_SIZE, 1_000_000_000, Duration.ofSeconds(5)) + try (DeadLetterQueueWriter writeManager = newBuilder(dir, BLOCK_SIZE, 1_000_000_000, Duration.ofSeconds(5)) .build()) { for (int i = 0; i < 4; i++) { DLQEntry entry = new DLQEntry(new Event(), "type", "id", "1"); @@ -219,8 +226,7 @@ public void testNotFlushed() throws Exception { @Test public void testCloseFlush() throws Exception { - try (DeadLetterQueueWriter writer = DeadLetterQueueWriter - .newBuilder(dir, 1_000, 1_000_000, Duration.ofHours(1)) + try (DeadLetterQueueWriter writer = newBuilder(dir, 1_000, 1_000_000, Duration.ofHours(1)) .build()) { DLQEntry entry = new DLQEntry(new Event(), "type", "id", "1"); writer.writeEntry(entry); @@ -255,8 +261,7 @@ public void testRemoveOldestSegmentWhenRetainedSizeIsExceededAndDropOlderModeIsE long startTime = System.currentTimeMillis(); int messageSize = 0; - try (DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, 10 * MB, 20 * MB, Duration.ofSeconds(1)) + try (DeadLetterQueueWriter writeManager = newBuilder(dir, 10 * MB, 20 * MB) .build()) { // 320 generates 10 Mb of data @@ -287,8 +292,7 @@ public void testRemoveOldestSegmentWhenRetainedSizeIsExceededAndDropOlderModeIsE final long prevQueueSize; final long beheadedQueueSize; long droppedEvent; - try (DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, 10 * MB, 20 * MB, Duration.ofSeconds(1)) + try (DeadLetterQueueWriter writeManager = newBuilder(dir, 10 * MB, 20 * MB) .storageType(QueueStorageType.DROP_OLDER) .build()) { prevQueueSize = writeManager.getCurrentQueueSize(); @@ -325,8 +329,7 @@ public void testRemoveOldestSegmentWhenRetainedSizeIsExceededAndDropOlderModeIsE @Test public void testRemoveSegmentsOrder() throws IOException { - try (DeadLetterQueueWriter sut = DeadLetterQueueWriter - .newBuilder(dir, 10 * MB, 20 * MB, Duration.ofSeconds(1)) + try (DeadLetterQueueWriter sut = newBuilder(dir, 10 * MB, 20 * MB) .build()) { // create some segments files Files.createFile(dir.resolve("9.log")); @@ -495,8 +498,7 @@ public void testDropEventCountCorrectlyNotEnqueuedEvents() throws IOException, I Event bigEvent = DeadLetterQueueReaderTest.createEventWithConstantSerializationOverhead(Collections.emptyMap()); bigEvent.setField("message", DeadLetterQueueReaderTest.generateMessageContent(2 * BLOCK_SIZE)); - try (DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, 10 * MB, 20 * MB, Duration.ofSeconds(1)) + try (DeadLetterQueueWriter writeManager = newBuilder(dir, 10 * MB, 20 * MB) .build()) { // enqueue a record with size smaller than BLOCK_SIZE DLQEntry entry = new DLQEntry(blockAlmostFullEvent, "", "", "00001", DeadLetterQueueReaderTest.constantSerializationLengthTimestamp(System.currentTimeMillis())); @@ -513,8 +515,7 @@ public void testDropEventCountCorrectlyNotEnqueuedEvents() throws IOException, I // fill the queue to push out the segment with the 2 previous events Event event = DeadLetterQueueReaderTest.createEventWithConstantSerializationOverhead(Collections.emptyMap()); event.setField("message", DeadLetterQueueReaderTest.generateMessageContent(32500)); - try (DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, 10 * MB, 20 * MB, Duration.ofSeconds(1)) + try (DeadLetterQueueWriter writeManager = newBuilder(dir, 10 * MB, 20 * MB) .storageType(QueueStorageType.DROP_NEWER) .build()) { @@ -543,16 +544,14 @@ public void testDropEventCountCorrectlyNotEnqueuedEvents() throws IOException, I public void testInitializeWriterWith1ByteEntry() throws Exception { Files.write(dir.resolve("1.log"), "1".getBytes()); - DeadLetterQueueWriter writer = DeadLetterQueueWriter - .newBuilder(dir, 1_000, 100_000, Duration.ofSeconds(1)) + DeadLetterQueueWriter writer = newBuilder(dir) .build(); writer.close(); } @Test public void givenDLQWriterCreatedSomeSegmentsWhenReaderWithCleanConsumedNotifyTheDeletionOfSomeThenWriterUpdatesItsMetricsSize() throws IOException, InterruptedException { - try (DeadLetterQueueWriter writer = DeadLetterQueueWriter - .newBuilder(dir, 1 * MB, 100 * MB, Duration.ofSeconds(1)) + try (DeadLetterQueueWriter writer = newBuilder(dir, 1 * MB, 100 * MB) .build()) { // fill at least 3 segments @@ -616,4 +615,53 @@ private long countDlqSegments(Path dir) throws IOException { } } + @Test + public void givenFlushIntervalGreaterThanMinimumWhenNormalizedThenRemainsUnmodified() { + DeadLetterQueueWriter.Builder builder = newBuilder(dir); + Duration result = builder.normalizeFlushInterval(Duration.ofSeconds(10)); + assertEquals("Valid flush interval should remain unchanged", Duration.ofSeconds(10), result); + } + + @Test + public void givenFlushIntervalBelowTheMinimumWhenNormalizedThenIsClampedToMinimum() { + DeadLetterQueueWriter.Builder builder = newBuilder(dir); + Duration result = builder.normalizeFlushInterval(Duration.ofMillis(100)); + assertEquals("Flush interval below 1s should be clamped to 1s", Duration.ofSeconds(1), result); + } + + @Test + public void testNormalizeFlushCheckIntervalWithinLimits() { + DeadLetterQueueWriter.Builder builder = newBuilder(dir); + Duration flushInterval = Duration.ofSeconds(5); + Duration flushCheckInterval = Duration.ofSeconds(2); + Duration result = builder.normalizeFlushCheckInterval(flushCheckInterval, flushInterval); + assertEquals("Valid flush check interval should remain unchanged", flushCheckInterval, result); + } + + @Test + public void givenFlushCheckIntervalBelowMinimumWhenNormalizedThenClampedToMinimum() { + DeadLetterQueueWriter.Builder builder = newBuilder(dir); + Duration flushInterval = Duration.ofSeconds(5); + Duration belowMinimum = Duration.ofMillis(500); + Duration result = builder.normalizeFlushCheckInterval(belowMinimum, flushInterval); + assertEquals("Flush check interval below 1s should be clamped to 1s", Duration.ofSeconds(1), result); + } + + @Test + public void givenFlushCheckIntervalExceedsFlushIntervalWhenNormalizedThenClampedToFlushInterval() { + DeadLetterQueueWriter.Builder builder = newBuilder(dir); + Duration flushInterval = Duration.ofSeconds(3); + Duration aboveFlushInterval = Duration.ofSeconds(5); + Duration result = builder.normalizeFlushCheckInterval(aboveFlushInterval, flushInterval); + assertEquals("Flush check interval exceeding flush interval should be clamped to flush interval", flushInterval, result); + } + + @Test + public void givenFlushCheckIntervalJustBelowFlushIntervalWhenNormalizedThenAccepted() { + DeadLetterQueueWriter.Builder builder = newBuilder(dir); + Duration flushInterval = Duration.ofSeconds(5); + Duration justBelowFlushInterval = Duration.ofMillis(4900); + Duration result = builder.normalizeFlushCheckInterval(justBelowFlushInterval, flushInterval); + assertEquals("Flush check interval just below flush interval should be accepted", justBelowFlushInterval, result); + } }