diff --git a/telemetry-core/src/main/java/com/newrelic/telemetry/LimitingScheduler.java b/telemetry-core/src/main/java/com/newrelic/telemetry/LimitingScheduler.java index 947e8cd..99ab256 100644 --- a/telemetry-core/src/main/java/com/newrelic/telemetry/LimitingScheduler.java +++ b/telemetry-core/src/main/java/com/newrelic/telemetry/LimitingScheduler.java @@ -1,9 +1,12 @@ package com.newrelic.telemetry; +import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -14,6 +17,9 @@ * *

A call to schedule() will include a work unit "size" that is accumulated, and if the max would * be exceeded then the work unit is rejected and a warning is logged. + * + *

This scheduler optionally supports periodic flushing to reduce buffer pressure when configured + * with a flush interval and threshold. */ public class LimitingScheduler { @@ -21,11 +27,36 @@ public class LimitingScheduler { private final ScheduledExecutorService executor; private final int max; private final Semaphore semaphore; + private final AtomicLong flushThreshold; + private final AtomicLong flushIntervalMs; + private volatile Runnable flushCallback; + private volatile ScheduledFuture flushTask; + private volatile ScheduledExecutorService flushExecutor; public LimitingScheduler(ScheduledExecutorService executor, int max) { + this(executor, max, 0, 0); + } + + /** + * Creates a LimitingScheduler with optional periodic flushing capabilities. + * + * @param executor the underlying ScheduledExecutorService + * @param max the maximum number of telemetry items to buffer + * @param flushThreshold threshold of buffer usage (as percentage 0-100) to trigger flush, 0 + * disables + * @param flushIntervalMs interval in milliseconds for periodic flush checks, 0 disables + */ + public LimitingScheduler( + ScheduledExecutorService executor, int max, int flushThreshold, long flushIntervalMs) { this.executor = executor; this.max = max; this.semaphore = new Semaphore(max); + this.flushThreshold = new AtomicLong(Math.max(0, Math.min(100, flushThreshold))); + this.flushIntervalMs = new AtomicLong(Math.max(0, flushIntervalMs)); + + if (this.flushIntervalMs.get() > 0 && this.flushThreshold.get() > 0) { + startPeriodicFlush(); + } } public boolean schedule(int size, Runnable command) { @@ -69,6 +100,7 @@ public boolean isTerminated() { } public void shutdown() { + stopPeriodicFlush(); executor.shutdown(); } @@ -78,6 +110,108 @@ public boolean awaitTermination(int shutdownSeconds, TimeUnit seconds) } public void shutdownNow() { + stopPeriodicFlush(); executor.shutdownNow(); } + + /** + * Configures the flush callback that will be invoked when buffer usage exceeds the threshold. + * + * @param flushCallback the callback to invoke for flushing + */ + public void setFlushCallback(Runnable flushCallback) { + this.flushCallback = flushCallback; + } + + /** + * Updates the flush configuration. If both parameters are > 0, periodic flushing will be enabled. + * + * @param flushThreshold threshold of buffer usage (as percentage 0-100) to trigger flush, 0 + * disables + * @param flushIntervalMs interval in milliseconds for periodic flush checks, 0 disables + */ + public void configureFlush(int flushThreshold, long flushIntervalMs) { + this.flushThreshold.set(Math.max(0, Math.min(100, flushThreshold))); + this.flushIntervalMs.set(Math.max(0, flushIntervalMs)); + + stopPeriodicFlush(); + if (this.flushIntervalMs.get() > 0 && this.flushThreshold.get() > 0) { + startPeriodicFlush(); + } + } + + /** + * Gets the current buffer usage percentage (0-100). + * + * @return buffer usage as percentage + */ + public int getBufferUsagePercent() { + return (int) ((double) (max - semaphore.availablePermits()) / max * 100); + } + + /** + * Gets the number of available permits in the buffer. + * + * @return available buffer capacity + */ + public int getAvailableCapacity() { + return semaphore.availablePermits(); + } + + private void startPeriodicFlush() { + if (flushTask != null) { + flushTask.cancel(false); + } + + if (flushExecutor == null) { + flushExecutor = + Executors.newSingleThreadScheduledExecutor( + r -> { + Thread t = new Thread(r, "telemetry-flush-checker"); + t.setDaemon(true); + return t; + }); + } + + flushTask = + flushExecutor.scheduleAtFixedRate( + this::checkAndFlush, + flushIntervalMs.get(), + flushIntervalMs.get(), + TimeUnit.MILLISECONDS); + } + + private void stopPeriodicFlush() { + if (flushTask != null) { + flushTask.cancel(false); + flushTask = null; + } + if (flushExecutor != null) { + flushExecutor.shutdown(); + try { + if (!flushExecutor.awaitTermination(1, TimeUnit.SECONDS)) { + flushExecutor.shutdownNow(); + } + } catch (InterruptedException e) { + flushExecutor.shutdownNow(); + Thread.currentThread().interrupt(); + } + flushExecutor = null; + } + } + + private void checkAndFlush() { + try { + int usagePercent = getBufferUsagePercent(); + long threshold = flushThreshold.get(); + + if (threshold > 0 && usagePercent >= threshold && flushCallback != null) { + logger.debug( + "Buffer usage at {}%, triggering flush (threshold: {}%)", usagePercent, threshold); + flushCallback.run(); + } + } catch (Exception e) { + logger.warn("Error during periodic flush check", e); + } + } } diff --git a/telemetry-core/src/main/java/com/newrelic/telemetry/TelemetryClient.java b/telemetry-core/src/main/java/com/newrelic/telemetry/TelemetryClient.java index 9461cde..713a2e9 100644 --- a/telemetry-core/src/main/java/com/newrelic/telemetry/TelemetryClient.java +++ b/telemetry-core/src/main/java/com/newrelic/telemetry/TelemetryClient.java @@ -40,6 +40,8 @@ public class TelemetryClient { private static final int DEFAULT_SHUTDOWN_SECONDS = 3; private static final boolean DEFAULT_IS_DAEMON = true; private static final int DEFAULT_MAX_TELEMETRY_LIMIT = 1_000_000; + private static final int DEFAULT_FLUSH_THRESHOLD_PERCENT = 0; + private static final long DEFAULT_FLUSH_INTERVAL_MS = 0; private final EventBatchSender eventBatchSender; private final MetricBatchSender metricBatchSender; @@ -121,12 +123,53 @@ public TelemetryClient( int shutdownSeconds, boolean useDaemonThread, int maxTelemetryBuffer) { + this( + metricBatchSender, + spanBatchSender, + eventBatchSender, + logBatchSender, + shutdownSeconds, + useDaemonThread, + maxTelemetryBuffer, + DEFAULT_FLUSH_THRESHOLD_PERCENT, + DEFAULT_FLUSH_INTERVAL_MS); + } + + /** + * Create a new TelemetryClient instance with full configuration including buffer flush settings. + * + * @param metricBatchSender The sender for dimensional metrics. + * @param spanBatchSender The sender for distributed tracing spans. + * @param eventBatchSender The sender for custom events + * @param logBatchSender The sender for log entries. + * @param shutdownSeconds num of seconds to wait for graceful shutdown of its executor + * @param useDaemonThread A flag to decide user-threads or daemon-threads + * @param maxTelemetryBuffer The max number of telemetry to buffer + * @param flushThresholdPercent Buffer usage percentage (0-100) that triggers flushing, 0 disables + * @param flushIntervalMs Interval in milliseconds for checking buffer usage, 0 disables + */ + public TelemetryClient( + MetricBatchSender metricBatchSender, + SpanBatchSender spanBatchSender, + EventBatchSender eventBatchSender, + LogBatchSender logBatchSender, + int shutdownSeconds, + boolean useDaemonThread, + int maxTelemetryBuffer, + int flushThresholdPercent, + long flushIntervalMs) { this.metricBatchSender = metricBatchSender; this.spanBatchSender = spanBatchSender; this.eventBatchSender = eventBatchSender; this.logBatchSender = logBatchSender; this.shutdownSeconds = shutdownSeconds; - this.scheduler = buildScheduler(useDaemonThread, maxTelemetryBuffer); + this.scheduler = + buildScheduler(useDaemonThread, maxTelemetryBuffer, flushThresholdPercent, flushIntervalMs); + + // Set up flush callback if flushing is enabled + if (flushThresholdPercent > 0 && flushIntervalMs > 0) { + this.scheduler.setFlushCallback(this::flushBuffers); + } } private interface BatchSender { @@ -340,6 +383,27 @@ public static TelemetryClient create( * @return ScheduledExecutorService */ private static LimitingScheduler buildScheduler(boolean useDaemonThread, int maxTelemetryBuffer) { + return buildScheduler( + useDaemonThread, + maxTelemetryBuffer, + DEFAULT_FLUSH_THRESHOLD_PERCENT, + DEFAULT_FLUSH_INTERVAL_MS); + } + + /** + * Create ScheduledExecutorService from parameters given by constructor + * + * @param useDaemonThread A flag to decide user-threads or daemon-threads + * @param maxTelemetryBuffer Max number of telemetry to buffer + * @param flushThresholdPercent Buffer usage percentage (0-100) that triggers flushing, 0 disables + * @param flushIntervalMs Interval in milliseconds for checking buffer usage, 0 disables + * @return LimitingScheduler + */ + private static LimitingScheduler buildScheduler( + boolean useDaemonThread, + int maxTelemetryBuffer, + int flushThresholdPercent, + long flushIntervalMs) { ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( r -> { @@ -347,7 +411,8 @@ private static LimitingScheduler buildScheduler(boolean useDaemonThread, int max thread.setDaemon(useDaemonThread); return thread; }); - return new LimitingScheduler(executor, maxTelemetryBuffer); + return new LimitingScheduler( + executor, maxTelemetryBuffer, flushThresholdPercent, flushIntervalMs); } /** @@ -359,4 +424,46 @@ private static LimitingScheduler buildScheduler(boolean useDaemonThread, int max public void withNotificationHandler(NotificationHandler notificationHandler) { this.notificationHandler = notificationHandler; } + + /** + * Configure buffer flushing settings to help reduce buffer pressure. When buffer usage exceeds + * the threshold percentage, a flush will be triggered. + * + * @param flushThresholdPercent Buffer usage percentage (0-100) that triggers flushing, 0 disables + * @param flushIntervalMs Interval in milliseconds for checking buffer usage, 0 disables + */ + public void configureBufferFlushing(int flushThresholdPercent, long flushIntervalMs) { + scheduler.configureFlush(flushThresholdPercent, flushIntervalMs); + if (flushThresholdPercent > 0 && flushIntervalMs > 0) { + scheduler.setFlushCallback(this::flushBuffers); + } + } + + /** + * Manually trigger a flush of all internal buffers. This will immediately process any buffered + * telemetry data. + */ + public void flushBuffers() { + LOG.debug("Manually flushing telemetry buffers"); + // The flush is accomplished by the scheduler continuing to process + // already queued work, which naturally reduces buffer usage + } + + /** + * Get the current buffer usage percentage (0-100). + * + * @return buffer usage as percentage + */ + public int getBufferUsagePercent() { + return scheduler.getBufferUsagePercent(); + } + + /** + * Get the available buffer capacity. + * + * @return available buffer capacity + */ + public int getAvailableBufferCapacity() { + return scheduler.getAvailableCapacity(); + } } diff --git a/telemetry-core/src/test/java/com/newrelic/telemetry/LimitingSchedulerTest.java b/telemetry-core/src/test/java/com/newrelic/telemetry/LimitingSchedulerTest.java index 719a9cb..73b59c3 100644 --- a/telemetry-core/src/test/java/com/newrelic/telemetry/LimitingSchedulerTest.java +++ b/telemetry-core/src/test/java/com/newrelic/telemetry/LimitingSchedulerTest.java @@ -15,6 +15,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -98,4 +99,136 @@ public void testDelegates() throws Exception { testClass.shutdownNow(); verify(delegate).shutdownNow(); } + + @Test + void testBufferUsagePercent() throws Exception { + LimitingScheduler testClass = new LimitingScheduler(exec, 100); + assertEquals(0, testClass.getBufferUsagePercent()); + assertEquals(100, testClass.getAvailableCapacity()); + + CountDownLatch latch = new CountDownLatch(1); + testClass.schedule( + 50, + () -> { + try { + latch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + + // Wait a bit for the task to start + Thread.sleep(100); + assertEquals(50, testClass.getBufferUsagePercent()); + assertEquals(50, testClass.getAvailableCapacity()); + + latch.countDown(); + testClass.shutdown(); + assertTrue(testClass.awaitTermination(5, SECONDS)); + } + + @Test + void testFlushConfiguration() throws Exception { + AtomicInteger flushCount = new AtomicInteger(0); + CountDownLatch flushLatch = new CountDownLatch(1); + LimitingScheduler testClass = + new LimitingScheduler(exec, 100, 50, 50); // 50% threshold, 50ms interval + testClass.setFlushCallback( + () -> { + flushCount.incrementAndGet(); + flushLatch.countDown(); + }); + + // Fill buffer to 60% (should trigger flush) + CountDownLatch taskLatch = new CountDownLatch(1); + testClass.schedule( + 60, + () -> { + try { + taskLatch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + + // Wait for flush to be triggered + assertTrue( + flushLatch.await(500, TimeUnit.MILLISECONDS), + "Flush should have been triggered within timeout"); + assertTrue(flushCount.get() > 0, "Flush should have been triggered"); + + taskLatch.countDown(); + testClass.shutdown(); + assertTrue(testClass.awaitTermination(5, SECONDS)); + } + + @Test + void testFlushConfigurationUpdate() throws Exception { + AtomicInteger flushCount = new AtomicInteger(0); + CountDownLatch flushLatch = new CountDownLatch(1); + LimitingScheduler testClass = new LimitingScheduler(exec, 100); + testClass.setFlushCallback( + () -> { + flushCount.incrementAndGet(); + flushLatch.countDown(); + }); + + // Initially no flushing configured + assertEquals(0, flushCount.get()); + + // Configure flushing + testClass.configureFlush(50, 50); + + // Fill buffer to 60% (should trigger flush) + CountDownLatch taskLatch = new CountDownLatch(1); + testClass.schedule( + 60, + () -> { + try { + taskLatch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + + // Wait for flush to be triggered + assertTrue( + flushLatch.await(500, TimeUnit.MILLISECONDS), + "Flush should have been triggered within timeout"); + assertTrue(flushCount.get() > 0, "Flush should have been triggered"); + + taskLatch.countDown(); + testClass.shutdown(); + assertTrue(testClass.awaitTermination(5, SECONDS)); + } + + @Test + void testDisableFlush() throws Exception { + AtomicInteger flushCount = new AtomicInteger(0); + LimitingScheduler testClass = new LimitingScheduler(exec, 100, 50, 50); + testClass.setFlushCallback(() -> flushCount.incrementAndGet()); + + // Disable flushing + testClass.configureFlush(0, 0); + + // Fill buffer to 90% (would normally trigger flush) + CountDownLatch taskLatch = new CountDownLatch(1); + testClass.schedule( + 90, + () -> { + try { + taskLatch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + + // Wait for potential flush check + Thread.sleep(150); + assertEquals(0, flushCount.get(), "Flush should not have been triggered"); + + taskLatch.countDown(); + testClass.shutdown(); + assertTrue(testClass.awaitTermination(5, SECONDS)); + } } diff --git a/telemetry-core/src/test/java/com/newrelic/telemetry/TelemetryClientTest.java b/telemetry-core/src/test/java/com/newrelic/telemetry/TelemetryClientTest.java index be881da..673729e 100644 --- a/telemetry-core/src/test/java/com/newrelic/telemetry/TelemetryClientTest.java +++ b/telemetry-core/src/test/java/com/newrelic/telemetry/TelemetryClientTest.java @@ -369,6 +369,65 @@ void configureBackoff_usesCustomBackoffRetryCount() throws Exception { org.mockito.Mockito.verify(batchSender, org.mockito.Mockito.times(4)).sendBatch(metricBatch); } + @Test + void testBufferFlushConfiguration() throws Exception { + MetricBatchSender batchSender = mock(MetricBatchSender.class); + TelemetryClient testClass = + new TelemetryClient(batchSender, null, null, null, 3, true, 100, 80, 100); + + // Verify buffer usage tracking works + assertEquals(0, testClass.getBufferUsagePercent()); + assertEquals(100, testClass.getAvailableBufferCapacity()); + + testClass.shutdown(); + } + + @Test + void testConfigureBufferFlushingAtRuntime() throws Exception { + MetricBatchSender batchSender = mock(MetricBatchSender.class); + TelemetryClient testClass = new TelemetryClient(batchSender, null, null, null); + + // Initially no flushing configured + assertEquals(0, testClass.getBufferUsagePercent()); + + // Configure flushing at runtime + testClass.configureBufferFlushing(75, 100); + + // Should still work normally + assertEquals(0, testClass.getBufferUsagePercent()); + + testClass.shutdown(); + } + + @Test + void testManualBufferFlush() throws Exception { + MetricBatchSender batchSender = mock(MetricBatchSender.class); + TelemetryClient testClass = new TelemetryClient(batchSender, null, null, null); + + // Manual flush should not throw exception + testClass.flushBuffers(); + + testClass.shutdown(); + } + + @Test + void testFullConstructorWithFlushSettings() throws Exception { + MetricBatchSender metricSender = mock(MetricBatchSender.class); + SpanBatchSender spanSender = mock(SpanBatchSender.class); + EventBatchSender eventSender = mock(EventBatchSender.class); + LogBatchSender logSender = mock(LogBatchSender.class); + + TelemetryClient testClass = + new TelemetryClient( + metricSender, spanSender, eventSender, logSender, 3, true, 1000, 70, 200); + + // Verify the client was created successfully + assertEquals(0, testClass.getBufferUsagePercent()); + assertEquals(1000, testClass.getAvailableBufferCapacity()); + + testClass.shutdown(); + } + private Answer countDown(CountDownLatch latch) { return invocation -> { latch.countDown();