Skip to content

Commit c400c77

Browse files
committed
Add configuration option to to allow customizing the internal executor queue
1 parent 2f8600c commit c400c77

3 files changed

Lines changed: 44 additions & 8 deletions

File tree

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "feature",
3+
"category": "CloudWatch Metric Publisher",
4+
"contributor": "",
5+
"description": "Add `taskQueue` configuration option to `CloudWatchMetricPublisher.Builder` to allow customizing the internal executor queue. This enables high-throughput applications to use a larger queue to prevent dropped metrics."
6+
}

metric-publishers/cloudwatch-metric-publisher/src/main/java/software/amazon/awssdk/metrics/publishers/cloudwatch/CloudWatchMetricPublisher.java

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -170,15 +170,10 @@
170170
@SdkPublicApi
171171
public final class CloudWatchMetricPublisher implements MetricPublisher {
172172
/**
173-
* The maximum queue size for the internal {@link #executor} that is used to aggregate metric data and upload it to
173+
* The default maximum queue size for the internal {@link #executor} that is used to aggregate metric data and upload it to
174174
* CloudWatch. If this value is too high, memory is wasted. If this value is too low, metrics could be dropped.
175-
*
176-
* This value is not currently configurable, because it's unlikely that this is a value that customers should need to modify.
177-
* If customers really need control over this value, we might consider letting them instead configure the
178-
* {@link BlockingQueue} used on the executor. The value here depends on the type of {@code BlockingQueue} in use, and
179-
* we should probably not indirectly couple people to the type of blocking queue we're using.
180175
*/
181-
private static final int MAXIMUM_TASK_QUEUE_SIZE = 128;
176+
private static final int DEFAULT_TASK_QUEUE_SIZE = 128;
182177

183178
private static final String DEFAULT_NAMESPACE = "AwsSdk/JavaSdk2";
184179
private static final int DEFAULT_MAXIMUM_CALLS_PER_UPLOAD = 10;
@@ -241,7 +236,7 @@ private CloudWatchMetricPublisher(Builder builder) {
241236

242237
// Do not increase above 1 thread: access to MetricCollectionAggregator is not thread safe.
243238
this.executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
244-
new ArrayBlockingQueue<>(MAXIMUM_TASK_QUEUE_SIZE),
239+
resolveTaskQueue(builder),
245240
threadFactory);
246241

247242
long flushFrequencyInMillis = resolveUploadFrequency(builder).toMillis();
@@ -285,6 +280,10 @@ private int resolveMaximumCallsPerUpload(Builder builder) {
285280
return builder.maximumCallsPerUpload == null ? DEFAULT_MAXIMUM_CALLS_PER_UPLOAD : builder.maximumCallsPerUpload;
286281
}
287282

283+
private BlockingQueue<Runnable> resolveTaskQueue(Builder builder) {
284+
return builder.taskQueue == null ? new ArrayBlockingQueue<>(DEFAULT_TASK_QUEUE_SIZE) : builder.taskQueue;
285+
}
286+
288287
@Override
289288
public void publish(MetricCollection metricCollection) {
290289
try {
@@ -395,6 +394,7 @@ public static final class Builder {
395394
private Collection<MetricCategory> metricCategories;
396395
private MetricLevel metricLevel;
397396
private Collection<SdkMetric<?>> detailedMetrics;
397+
private BlockingQueue<Runnable> taskQueue;
398398

399399
private Builder() {
400400
}
@@ -467,6 +467,22 @@ public Builder maximumCallsPerUpload(Integer maximumCallsPerUpload) {
467467
return this;
468468
}
469469

470+
/**
471+
* Configure the {@link BlockingQueue} used by the internal executor for queuing metric aggregation and upload tasks.
472+
*
473+
* <p>If this is not specified, a blocking queue with a capacity of 128 is used.
474+
*
475+
* <p>High-throughput applications may need a larger queue to prevent dropped metrics. When the queue is full, new
476+
* metrics are dropped and a warning is logged.
477+
*
478+
* @param taskQueue the blocking queue to use for the internal executor
479+
* @return This object for method chaining.
480+
*/
481+
public Builder taskQueue(BlockingQueue<Runnable> taskQueue) {
482+
this.taskQueue = taskQueue;
483+
return this;
484+
}
485+
470486
/**
471487
* Configure the {@link SdkMetric}s that are used to define the {@link Dimension}s metrics are aggregated under.
472488
*

metric-publishers/cloudwatch-metric-publisher/src/test/java/software/amazon/awssdk/metrics/publishers/cloudwatch/CloudWatchMetricPublisherTest.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121

2222
import java.time.Duration;
2323
import java.util.List;
24+
import java.util.concurrent.ArrayBlockingQueue;
25+
import java.util.concurrent.BlockingQueue;
2426
import java.util.concurrent.CompletableFuture;
2527
import java.util.concurrent.CountDownLatch;
2628
import org.junit.Before;
@@ -268,6 +270,18 @@ public void detailedMetricsSettingIsHonored() {
268270
assertThat(availableConcurrency.statisticValues()).isNull();
269271
}
270272

273+
@Test
274+
public void taskQueueSettingIsHonored() {
275+
ArrayBlockingQueue<Runnable> customQueue = Mockito.spy(new ArrayBlockingQueue<>(128));
276+
try (CloudWatchMetricPublisher publisher = publisherBuilder.taskQueue(customQueue).build()) {
277+
MetricCollector collector = newCollector();
278+
collector.reportMetric(HttpMetric.AVAILABLE_CONCURRENCY, 5);
279+
publisher.publish(new FixedTimeMetricCollection(collector.collect()));
280+
}
281+
282+
Mockito.verify(customQueue, Mockito.atLeastOnce()).offer(any(Runnable.class));
283+
}
284+
271285
private MetricDatum getDatum(PutMetricDataRequest call, SdkMetric<?> metric) {
272286
return call.metricData().stream().filter(m -> m.metricName().equals(metric.name())).findAny().get();
273287
}

0 commit comments

Comments
 (0)