Skip to content

Commit eb1107e

Browse files
committed
test(integration): improve tests
Signed-off-by: Marcos Tischer Vallim <tischer@gmail.com>
1 parent 102da8b commit eb1107e

25 files changed

Lines changed: 2398 additions & 545 deletions

amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/AmazonSnsThreadPoolExecutor.java

Lines changed: 1 addition & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -16,96 +16,15 @@
1616

1717
package com.amazon.sns.messaging.lib.concurrent;
1818

19-
import java.util.Objects;
2019
import java.util.concurrent.SynchronousQueue;
2120
import java.util.concurrent.ThreadPoolExecutor;
2221
import java.util.concurrent.TimeUnit;
23-
import java.util.concurrent.atomic.AtomicInteger;
2422

25-
/**
26-
* A custom {@link ThreadPoolExecutor} that tracks active, failed, and succeeded task counts.
27-
* Uses a blocking submission policy to handle rejection and a synchronous queue for task handoff.
28-
*/
23+
// @formatter:off
2924
public class AmazonSnsThreadPoolExecutor extends ThreadPoolExecutor {
3025

31-
private final AtomicInteger activeTaskCount = new AtomicInteger();
32-
33-
private final AtomicInteger failedTaskCount = new AtomicInteger();
34-
35-
private final AtomicInteger succeededTaskCount = new AtomicInteger();
36-
37-
/**
38-
* Creates a new executor with the specified maximum pool size.
39-
*
40-
* @param maximumPoolSize the maximum number of threads to allow in the pool
41-
*/
4226
public AmazonSnsThreadPoolExecutor(final int maximumPoolSize) {
4327
super(0, maximumPoolSize, 60, TimeUnit.SECONDS, new SynchronousQueue<>(), ThreadFactoryProvider.getThreadFactory(), new BlockingSubmissionPolicy(30000));
4428
}
4529

46-
/**
47-
* Returns the number of currently active tasks.
48-
*
49-
* @return the active task count
50-
*/
51-
public int getActiveTaskCount() {
52-
return activeTaskCount.get();
53-
}
54-
55-
/**
56-
* Returns the number of tasks that have failed.
57-
*
58-
* @return the failed task count
59-
*/
60-
public int getFailedTaskCount() {
61-
return failedTaskCount.get();
62-
}
63-
64-
/**
65-
* Returns the number of tasks that have completed successfully.
66-
*
67-
* @return the succeeded task count
68-
*/
69-
public int getSucceededTaskCount() {
70-
return succeededTaskCount.get();
71-
}
72-
73-
/**
74-
* Returns the current size of the task queue.
75-
*
76-
* @return the queue size
77-
*/
78-
public int getQueueSize() {
79-
return getQueue().size();
80-
}
81-
82-
/**
83-
* {@inheritDoc}
84-
*/
85-
@Override
86-
protected void beforeExecute(final Thread thread, final Runnable runnable) {
87-
try {
88-
super.beforeExecute(thread, runnable);
89-
} finally {
90-
activeTaskCount.incrementAndGet();
91-
}
92-
}
93-
94-
/**
95-
* {@inheritDoc}
96-
*/
97-
@Override
98-
protected void afterExecute(final Runnable runnable, final Throwable throwable) {
99-
try {
100-
super.afterExecute(runnable, throwable);
101-
} finally {
102-
if (Objects.nonNull(throwable)) {
103-
failedTaskCount.incrementAndGet();
104-
} else {
105-
succeededTaskCount.incrementAndGet();
106-
}
107-
activeTaskCount.decrementAndGet();
108-
}
109-
}
110-
11130
}

amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/ExecutorsProvider.java

Lines changed: 0 additions & 102 deletions
This file was deleted.

amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsProducer.java

Lines changed: 22 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,9 @@
1616

1717
package com.amazon.sns.messaging.lib.core;
1818

19-
import java.util.List;
2019
import java.util.concurrent.BlockingQueue;
2120
import java.util.concurrent.ConcurrentMap;
22-
import java.util.concurrent.ExecutorService;
23-
import java.util.concurrent.TimeUnit;
24-
25-
import org.slf4j.Logger;
26-
import org.slf4j.LoggerFactory;
21+
import java.util.concurrent.atomic.AtomicReference;
2722

2823
import com.amazon.sns.messaging.lib.model.RequestEntry;
2924
import com.amazon.sns.messaging.lib.model.ResponseFailEntry;
@@ -44,14 +39,12 @@
4439
@RequiredArgsConstructor(access = AccessLevel.PROTECTED)
4540
abstract class AbstractAmazonSnsProducer<E> {
4641

47-
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractAmazonSnsProducer.class);
42+
private final AtomicReference<State> state = new AtomicReference<>(State.RUNNIG);
4843

4944
private final ConcurrentMap<String, ListenableFuture<ResponseSuccessEntry, ResponseFailEntry>> pendingRequests;
5045

5146
private final BlockingQueue<RequestEntry<E>> topicRequests;
5247

53-
private final ExecutorService executorService;
54-
5548
/**
5649
* Sends a request entry by enqueuing it for batch processing.
5750
*
@@ -60,25 +53,27 @@ abstract class AbstractAmazonSnsProducer<E> {
6053
*/
6154
@SneakyThrows
6255
public ListenableFuture<ResponseSuccessEntry, ResponseFailEntry> send(final RequestEntry<E> requestEntry) {
63-
return enqueueRequest(requestEntry);
64-
}
56+
if (State.RUNNIG.equals(state.get())) {
57+
return enqueueRequest(requestEntry);
58+
} else {
59+
final ListenableFutureImpl listenableFutureImpl = new ListenableFutureImpl();
6560

66-
/**
67-
* Shuts down the producer's executor service gracefully, waiting up to 60 seconds
68-
* for termination.
69-
*/
70-
@SneakyThrows
71-
public void shutdown() {
72-
LOGGER.warn("Shutdown producer {}", getClass().getSimpleName());
61+
listenableFutureImpl.fail(ResponseFailEntry.builder()
62+
.withCode("000")
63+
.withId(requestEntry.getId())
64+
.withMessage(String.format("Producer is currently in %s mode; no further messages will be accepted.", state.get().name()))
65+
.withSenderFault(true)
66+
.build()
67+
);
7368

74-
executorService.shutdown();
75-
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
76-
LOGGER.warn("Executor service did not terminate in the specified time.");
77-
final List<Runnable> droppedTasks = executorService.shutdownNow();
78-
LOGGER.warn("Executor service was abruptly shut down. {} tasks will not be executed.", droppedTasks.size());
69+
return listenableFutureImpl;
7970
}
8071
}
8172

73+
public void shutdown() {
74+
state.compareAndSet(State.RUNNIG, State.SHUTDOWN);
75+
}
76+
8277
/**
8378
* Creates a {@link ListenableFuture} for the request, registers it in the pending map,
8479
* and enqueues the request for batch processing.
@@ -94,5 +89,9 @@ private ListenableFuture<ResponseSuccessEntry, ResponseFailEntry> enqueueRequest
9489
return trackPendingRequest;
9590
}
9691

92+
enum State {
93+
RUNNIG, SHUTDOWN
94+
}
95+
9796
}
9897
// @formatter:on

amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsTemplate.java

Lines changed: 88 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,29 @@
1616

1717
package com.amazon.sns.messaging.lib.core;
1818

19+
import java.util.Objects;
20+
import java.util.concurrent.BlockingQueue;
1921
import java.util.concurrent.CompletableFuture;
22+
import java.util.concurrent.ConcurrentHashMap;
23+
import java.util.concurrent.ConcurrentMap;
24+
import java.util.concurrent.ExecutorService;
25+
import java.util.function.Function;
26+
import java.util.function.UnaryOperator;
2027

2128
import com.amazon.sns.messaging.lib.concurrent.AmazonSnsThreadPoolExecutor;
29+
import com.amazon.sns.messaging.lib.concurrent.RingBufferBlockingQueue;
30+
import com.amazon.sns.messaging.lib.metrics.BlockingQueueMetricsDecorator;
31+
import com.amazon.sns.messaging.lib.metrics.ExecutorServiceMetricsDecorator;
2232
import com.amazon.sns.messaging.lib.model.RequestEntry;
2333
import com.amazon.sns.messaging.lib.model.ResponseFailEntry;
2434
import com.amazon.sns.messaging.lib.model.ResponseSuccessEntry;
2535
import com.amazon.sns.messaging.lib.model.TopicProperty;
36+
import com.fasterxml.jackson.databind.ObjectMapper;
2637

38+
import io.micrometer.core.instrument.MeterRegistry;
39+
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
2740
import lombok.AccessLevel;
41+
import lombok.Getter;
2842
import lombok.RequiredArgsConstructor;
2943

3044
// @formatter:off
@@ -79,8 +93,80 @@ public CompletableFuture<Void> await() {
7993
* @param topicProperty the topic configuration
8094
* @return a configured thread pool executor
8195
*/
82-
protected static AmazonSnsThreadPoolExecutor getAmazonSnsThreadPoolExecutor(final TopicProperty topicProperty) {
83-
return topicProperty.isFifo() ? new AmazonSnsThreadPoolExecutor(1) : new AmazonSnsThreadPoolExecutor(topicProperty.getMaximumPoolSize());
96+
protected static ExecutorService getExecutorService(final TopicProperty topicProperty, final MeterRegistry meterRegistry) {
97+
return topicProperty.isFifo()
98+
? new ExecutorServiceMetricsDecorator(
99+
new AmazonSnsThreadPoolExecutor(1),
100+
meterRegistry,
101+
topicProperty.getTopicArn()
102+
)
103+
: new ExecutorServiceMetricsDecorator(
104+
new AmazonSnsThreadPoolExecutor(topicProperty.getMaximumPoolSize()),
105+
meterRegistry,
106+
topicProperty.getTopicArn()
107+
);
108+
}
109+
110+
@Getter
111+
public static final class Builder<C, R, O, E, T extends AbstractAmazonSnsTemplate<C, R, O, E>> {
112+
113+
private final C amazonSnsClient;
114+
115+
private final TopicProperty topicProperty;
116+
117+
private ConcurrentMap<String, ListenableFuture<ResponseSuccessEntry, ResponseFailEntry>> pendingRequests = new ConcurrentHashMap<>();
118+
119+
private BlockingQueue<RequestEntry<E>> topicRequests;
120+
121+
private ObjectMapper objectMapper = new ObjectMapper();
122+
123+
private UnaryOperator<R> publishDecorator = UnaryOperator.identity();
124+
125+
private MeterRegistry meterRegistry = new SimpleMeterRegistry();
126+
127+
private final Function<Builder<C, R, O, E, T>, T> constructor;
128+
129+
Builder(final Function<Builder<C, R, O, E, T>, T> constructor, final C amazonSnsClient, final TopicProperty topicProperty) {
130+
this.amazonSnsClient = Objects.requireNonNull(amazonSnsClient, "amazonSnsClient");
131+
this.topicProperty = Objects.requireNonNull(topicProperty, "topicProperty");
132+
this.constructor = Objects.requireNonNull(constructor, "constructor");
133+
}
134+
135+
public Builder<C, R, O, E, T> pendingRequests(final ConcurrentMap<String, ListenableFuture<ResponseSuccessEntry, ResponseFailEntry>> pendingRequests) {
136+
this.pendingRequests = Objects.requireNonNull(pendingRequests, "pendingRequests");
137+
return this;
138+
}
139+
140+
public Builder<C, R, O, E, T> topicRequests(final BlockingQueue<RequestEntry<E>> topicRequests) {
141+
this.topicRequests = Objects.requireNonNull(topicRequests, "topicRequests");
142+
return this;
143+
}
144+
145+
public Builder<C, R, O, E, T> objectMapper(final ObjectMapper objectMapper) {
146+
this.objectMapper = Objects.requireNonNull(objectMapper, "objectMapper");
147+
return this;
148+
}
149+
150+
public Builder<C, R, O, E, T> publishDecorator(final UnaryOperator<R> publishDecorator) {
151+
this.publishDecorator = Objects.requireNonNull(publishDecorator, "publishDecorator");
152+
return this;
153+
}
154+
155+
public Builder<C, R, O, E, T> meterRegistry(final MeterRegistry meterRegistry) {
156+
this.meterRegistry = Objects.requireNonNull(meterRegistry, "meterRegistry");
157+
return this;
158+
}
159+
160+
public T build() {
161+
if (Objects.isNull(topicRequests)) {
162+
topicRequests = new RingBufferBlockingQueue<>(topicProperty.getMaximumPoolSize() * topicProperty.getMaxBatchSize());
163+
}
164+
165+
topicRequests = new BlockingQueueMetricsDecorator<>(topicRequests, meterRegistry, topicProperty.getTopicArn());
166+
167+
return constructor.apply(this);
168+
}
169+
84170
}
85171

86172
}

0 commit comments

Comments
 (0)