Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package com.newrelic.telemetry;

import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -14,18 +17,46 @@
*
* <p>A call to schedule() will include a work unit "size" that is accumulated, and if the max would
* be exceeded then the work unit is rejected and a warning is logged.
*
* <p>This scheduler optionally supports periodic flushing to reduce buffer pressure when configured
* with a flush interval and threshold.
*/
public class LimitingScheduler {

private static final Logger logger = LoggerFactory.getLogger(LimitingScheduler.class);
private final ScheduledExecutorService executor;
private final int max;
private final Semaphore semaphore;
private final AtomicLong flushThreshold;
private final AtomicLong flushIntervalMs;
private volatile Runnable flushCallback;
private volatile ScheduledFuture<?> flushTask;
private volatile ScheduledExecutorService flushExecutor;

public LimitingScheduler(ScheduledExecutorService executor, int max) {
this(executor, max, 0, 0);
}

/**
* Creates a LimitingScheduler with optional periodic flushing capabilities.
*
* @param executor the underlying ScheduledExecutorService
* @param max the maximum number of telemetry items to buffer
* @param flushThreshold threshold of buffer usage (as percentage 0-100) to trigger flush, 0
* disables
* @param flushIntervalMs interval in milliseconds for periodic flush checks, 0 disables
*/
public LimitingScheduler(
ScheduledExecutorService executor, int max, int flushThreshold, long flushIntervalMs) {
this.executor = executor;
this.max = max;
this.semaphore = new Semaphore(max);
this.flushThreshold = new AtomicLong(Math.max(0, Math.min(100, flushThreshold)));
this.flushIntervalMs = new AtomicLong(Math.max(0, flushIntervalMs));

if (this.flushIntervalMs.get() > 0 && this.flushThreshold.get() > 0) {
startPeriodicFlush();
}
}

public boolean schedule(int size, Runnable command) {
Expand Down Expand Up @@ -69,6 +100,7 @@ public boolean isTerminated() {
}

public void shutdown() {
stopPeriodicFlush();
executor.shutdown();
}

Expand All @@ -78,6 +110,108 @@ public boolean awaitTermination(int shutdownSeconds, TimeUnit seconds)
}

public void shutdownNow() {
stopPeriodicFlush();
executor.shutdownNow();
}

/**
* Configures the flush callback that will be invoked when buffer usage exceeds the threshold.
*
* @param flushCallback the callback to invoke for flushing
*/
public void setFlushCallback(Runnable flushCallback) {
this.flushCallback = flushCallback;
}

/**
* Updates the flush configuration. If both parameters are > 0, periodic flushing will be enabled.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the JDK javadoc linter is complaining about the use of >. I think you can simply replace it with &gt;

*
* @param flushThreshold threshold of buffer usage (as percentage 0-100) to trigger flush, 0
* disables
* @param flushIntervalMs interval in milliseconds for periodic flush checks, 0 disables
*/
public void configureFlush(int flushThreshold, long flushIntervalMs) {
this.flushThreshold.set(Math.max(0, Math.min(100, flushThreshold)));
this.flushIntervalMs.set(Math.max(0, flushIntervalMs));

stopPeriodicFlush();
if (this.flushIntervalMs.get() > 0 && this.flushThreshold.get() > 0) {
startPeriodicFlush();
}
}

/**
* Gets the current buffer usage percentage (0-100).
*
* @return buffer usage as percentage
*/
public int getBufferUsagePercent() {
return (int) ((double) (max - semaphore.availablePermits()) / max * 100);
}

/**
* Gets the number of available permits in the buffer.
*
* @return available buffer capacity
*/
public int getAvailableCapacity() {
return semaphore.availablePermits();
}

private void startPeriodicFlush() {
if (flushTask != null) {
flushTask.cancel(false);
}

if (flushExecutor == null) {
flushExecutor =
Executors.newSingleThreadScheduledExecutor(
r -> {
Thread t = new Thread(r, "telemetry-flush-checker");
t.setDaemon(true);
return t;
});
}

flushTask =
flushExecutor.scheduleAtFixedRate(
this::checkAndFlush,
flushIntervalMs.get(),
flushIntervalMs.get(),
TimeUnit.MILLISECONDS);
}

private void stopPeriodicFlush() {
if (flushTask != null) {
flushTask.cancel(false);
flushTask = null;
}
if (flushExecutor != null) {
flushExecutor.shutdown();
try {
if (!flushExecutor.awaitTermination(1, TimeUnit.SECONDS)) {
flushExecutor.shutdownNow();
}
} catch (InterruptedException e) {
flushExecutor.shutdownNow();
Thread.currentThread().interrupt();
}
flushExecutor = null;
}
}

private void checkAndFlush() {
try {
int usagePercent = getBufferUsagePercent();
long threshold = flushThreshold.get();

if (threshold > 0 && usagePercent >= threshold && flushCallback != null) {
logger.debug(
"Buffer usage at {}%, triggering flush (threshold: {}%)", usagePercent, threshold);
flushCallback.run();
}
} catch (Exception e) {
logger.warn("Error during periodic flush check", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ public class TelemetryClient {
private static final int DEFAULT_SHUTDOWN_SECONDS = 3;
private static final boolean DEFAULT_IS_DAEMON = true;
private static final int DEFAULT_MAX_TELEMETRY_LIMIT = 1_000_000;
private static final int DEFAULT_FLUSH_THRESHOLD_PERCENT = 0;
private static final long DEFAULT_FLUSH_INTERVAL_MS = 0;

private final EventBatchSender eventBatchSender;
private final MetricBatchSender metricBatchSender;
Expand Down Expand Up @@ -121,12 +123,53 @@ public TelemetryClient(
int shutdownSeconds,
boolean useDaemonThread,
int maxTelemetryBuffer) {
this(
metricBatchSender,
spanBatchSender,
eventBatchSender,
logBatchSender,
shutdownSeconds,
useDaemonThread,
maxTelemetryBuffer,
DEFAULT_FLUSH_THRESHOLD_PERCENT,
DEFAULT_FLUSH_INTERVAL_MS);
}

/**
* Create a new TelemetryClient instance with full configuration including buffer flush settings.
*
* @param metricBatchSender The sender for dimensional metrics.
* @param spanBatchSender The sender for distributed tracing spans.
* @param eventBatchSender The sender for custom events
* @param logBatchSender The sender for log entries.
* @param shutdownSeconds num of seconds to wait for graceful shutdown of its executor
* @param useDaemonThread A flag to decide user-threads or daemon-threads
* @param maxTelemetryBuffer The max number of telemetry to buffer
* @param flushThresholdPercent Buffer usage percentage (0-100) that triggers flushing, 0 disables
* @param flushIntervalMs Interval in milliseconds for checking buffer usage, 0 disables
*/
public TelemetryClient(
MetricBatchSender metricBatchSender,
SpanBatchSender spanBatchSender,
EventBatchSender eventBatchSender,
LogBatchSender logBatchSender,
int shutdownSeconds,
boolean useDaemonThread,
int maxTelemetryBuffer,
int flushThresholdPercent,
long flushIntervalMs) {
this.metricBatchSender = metricBatchSender;
this.spanBatchSender = spanBatchSender;
this.eventBatchSender = eventBatchSender;
this.logBatchSender = logBatchSender;
this.shutdownSeconds = shutdownSeconds;
this.scheduler = buildScheduler(useDaemonThread, maxTelemetryBuffer);
this.scheduler =
buildScheduler(useDaemonThread, maxTelemetryBuffer, flushThresholdPercent, flushIntervalMs);

// Set up flush callback if flushing is enabled
if (flushThresholdPercent > 0 && flushIntervalMs > 0) {
this.scheduler.setFlushCallback(this::flushBuffers);
}
}

private interface BatchSender {
Expand Down Expand Up @@ -340,14 +383,36 @@ public static TelemetryClient create(
* @return ScheduledExecutorService
*/
private static LimitingScheduler buildScheduler(boolean useDaemonThread, int maxTelemetryBuffer) {
return buildScheduler(
useDaemonThread,
maxTelemetryBuffer,
DEFAULT_FLUSH_THRESHOLD_PERCENT,
DEFAULT_FLUSH_INTERVAL_MS);
}

/**
* Create ScheduledExecutorService from parameters given by constructor
*
* @param useDaemonThread A flag to decide user-threads or daemon-threads
* @param maxTelemetryBuffer Max number of telemetry to buffer
* @param flushThresholdPercent Buffer usage percentage (0-100) that triggers flushing, 0 disables
* @param flushIntervalMs Interval in milliseconds for checking buffer usage, 0 disables
* @return LimitingScheduler
*/
private static LimitingScheduler buildScheduler(
boolean useDaemonThread,
int maxTelemetryBuffer,
int flushThresholdPercent,
long flushIntervalMs) {
ScheduledExecutorService executor =
Executors.newSingleThreadScheduledExecutor(
r -> {
Thread thread = new Thread(r);
thread.setDaemon(useDaemonThread);
return thread;
});
return new LimitingScheduler(executor, maxTelemetryBuffer);
return new LimitingScheduler(
executor, maxTelemetryBuffer, flushThresholdPercent, flushIntervalMs);
}

/**
Expand All @@ -359,4 +424,46 @@ private static LimitingScheduler buildScheduler(boolean useDaemonThread, int max
public void withNotificationHandler(NotificationHandler notificationHandler) {
this.notificationHandler = notificationHandler;
}

/**
* Configure buffer flushing settings to help reduce buffer pressure. When buffer usage exceeds
* the threshold percentage, a flush will be triggered.
*
* @param flushThresholdPercent Buffer usage percentage (0-100) that triggers flushing, 0 disables
* @param flushIntervalMs Interval in milliseconds for checking buffer usage, 0 disables
*/
public void configureBufferFlushing(int flushThresholdPercent, long flushIntervalMs) {
scheduler.configureFlush(flushThresholdPercent, flushIntervalMs);
if (flushThresholdPercent > 0 && flushIntervalMs > 0) {
scheduler.setFlushCallback(this::flushBuffers);
}
}

/**
* Manually trigger a flush of all internal buffers. This will immediately process any buffered
* telemetry data.
*/
public void flushBuffers() {
LOG.debug("Manually flushing telemetry buffers");
// The flush is accomplished by the scheduler continuing to process
// already queued work, which naturally reduces buffer usage
}

/**
* Get the current buffer usage percentage (0-100).
*
* @return buffer usage as percentage
*/
public int getBufferUsagePercent() {
return scheduler.getBufferUsagePercent();
}

/**
* Get the available buffer capacity.
*
* @return available buffer capacity
*/
public int getAvailableBufferCapacity() {
return scheduler.getAvailableCapacity();
}
}
Loading
Loading