Skip to content

Commit 8ae499a

Browse files
committed
chore: improve documentation
Signed-off-by: Marcos Tischer Vallim <tischer@gmail.com>
1 parent 102da8b commit 8ae499a

30 files changed

Lines changed: 2678 additions & 532 deletions

amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/AmazonSnsThreadPoolExecutor.java

Lines changed: 7 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -16,96 +16,26 @@
1616

1717
package com.amazon.sns.messaging.lib.concurrent;
1818

19-
import java.util.Objects;
2019
import java.util.concurrent.SynchronousQueue;
2120
import java.util.concurrent.ThreadPoolExecutor;
2221
import java.util.concurrent.TimeUnit;
23-
import java.util.concurrent.atomic.AtomicInteger;
2422

23+
// @formatter:off
2524
/**
26-
* A custom {@link ThreadPoolExecutor} that tracks active, failed, and succeeded task counts.
27-
* Uses a blocking submission policy to handle rejection and a synchronous queue for task handoff.
25+
* A {@link ThreadPoolExecutor} configured for Amazon SNS publishing. Uses a
26+
* {@link SynchronousQueue} with zero core threads, allowing threads to be created
27+
* on demand up to the specified maximum pool size. Tasks that cannot be accepted
28+
* immediately by the queue will block up to 30 seconds via {@link BlockingSubmissionPolicy}.
2829
*/
2930
public class AmazonSnsThreadPoolExecutor extends ThreadPoolExecutor {
3031

31-
private final AtomicInteger activeTaskCount = new AtomicInteger();
32-
33-
private final AtomicInteger failedTaskCount = new AtomicInteger();
34-
35-
private final AtomicInteger succeededTaskCount = new AtomicInteger();
36-
3732
/**
38-
* Creates a new executor with the specified maximum pool size.
33+
* Creates a new thread pool executor with the given maximum pool size.
3934
*
40-
* @param maximumPoolSize the maximum number of threads to allow in the pool
35+
* @param maximumPoolSize the maximum number of threads allowed in the pool
4136
*/
4237
public AmazonSnsThreadPoolExecutor(final int maximumPoolSize) {
4338
super(0, maximumPoolSize, 60, TimeUnit.SECONDS, new SynchronousQueue<>(), ThreadFactoryProvider.getThreadFactory(), new BlockingSubmissionPolicy(30000));
4439
}
4540

46-
/**
47-
* Returns the number of currently active tasks.
48-
*
49-
* @return the active task count
50-
*/
51-
public int getActiveTaskCount() {
52-
return activeTaskCount.get();
53-
}
54-
55-
/**
56-
* Returns the number of tasks that have failed.
57-
*
58-
* @return the failed task count
59-
*/
60-
public int getFailedTaskCount() {
61-
return failedTaskCount.get();
62-
}
63-
64-
/**
65-
* Returns the number of tasks that have completed successfully.
66-
*
67-
* @return the succeeded task count
68-
*/
69-
public int getSucceededTaskCount() {
70-
return succeededTaskCount.get();
71-
}
72-
73-
/**
74-
* Returns the current size of the task queue.
75-
*
76-
* @return the queue size
77-
*/
78-
public int getQueueSize() {
79-
return getQueue().size();
80-
}
81-
82-
/**
83-
* {@inheritDoc}
84-
*/
85-
@Override
86-
protected void beforeExecute(final Thread thread, final Runnable runnable) {
87-
try {
88-
super.beforeExecute(thread, runnable);
89-
} finally {
90-
activeTaskCount.incrementAndGet();
91-
}
92-
}
93-
94-
/**
95-
* {@inheritDoc}
96-
*/
97-
@Override
98-
protected void afterExecute(final Runnable runnable, final Throwable throwable) {
99-
try {
100-
super.afterExecute(runnable, throwable);
101-
} finally {
102-
if (Objects.nonNull(throwable)) {
103-
failedTaskCount.incrementAndGet();
104-
} else {
105-
succeededTaskCount.incrementAndGet();
106-
}
107-
activeTaskCount.decrementAndGet();
108-
}
109-
}
110-
11141
}

amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/ExecutorsProvider.java

Lines changed: 0 additions & 102 deletions
This file was deleted.

amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/RingBufferBlockingQueue.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,11 @@ public int drainTo(final Collection<? super E> collection, final int maxElements
283283
throw new UnsupportedOperationException();
284284
}
285285

286+
/**
287+
* Internal entry wrapper that holds a value within the ring buffer.
288+
*
289+
* @param <E> the type of the value
290+
*/
286291
@Getter
287292
@Setter
288293
static class Entry<E> {

amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsConsumer.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,14 @@
6565
*/
6666
abstract class AbstractAmazonSnsConsumer<C, R, O, E> implements Runnable {
6767

68+
/**
69+
* Kilobyte constant used for size calculations.
70+
*/
6871
private static final Integer KB = 1024;
6972

73+
/**
74+
* Maximum batch size threshold of 256 KB imposed by Amazon SNS.
75+
*/
7076
private static final Integer BATCH_SIZE_BYTES_THRESHOLD = 256 * AbstractAmazonSnsConsumer.KB;
7177

7278
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractAmazonSnsConsumer.class);

amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsProducer.java

Lines changed: 27 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,9 @@
1616

1717
package com.amazon.sns.messaging.lib.core;
1818

19-
import java.util.List;
2019
import java.util.concurrent.BlockingQueue;
2120
import java.util.concurrent.ConcurrentMap;
22-
import java.util.concurrent.ExecutorService;
23-
import java.util.concurrent.TimeUnit;
24-
25-
import org.slf4j.Logger;
26-
import org.slf4j.LoggerFactory;
21+
import java.util.concurrent.atomic.AtomicReference;
2722

2823
import com.amazon.sns.messaging.lib.model.RequestEntry;
2924
import com.amazon.sns.messaging.lib.model.ResponseFailEntry;
@@ -44,14 +39,12 @@
4439
@RequiredArgsConstructor(access = AccessLevel.PROTECTED)
4540
abstract class AbstractAmazonSnsProducer<E> {
4641

47-
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractAmazonSnsProducer.class);
42+
private final AtomicReference<State> state = new AtomicReference<>(State.RUNNIG);
4843

4944
private final ConcurrentMap<String, ListenableFuture<ResponseSuccessEntry, ResponseFailEntry>> pendingRequests;
5045

5146
private final BlockingQueue<RequestEntry<E>> topicRequests;
5247

53-
private final ExecutorService executorService;
54-
5548
/**
5649
* Sends a request entry by enqueuing it for batch processing.
5750
*
@@ -60,23 +53,29 @@ abstract class AbstractAmazonSnsProducer<E> {
6053
*/
6154
@SneakyThrows
6255
public ListenableFuture<ResponseSuccessEntry, ResponseFailEntry> send(final RequestEntry<E> requestEntry) {
63-
return enqueueRequest(requestEntry);
56+
if (State.RUNNIG.equals(state.get())) {
57+
return enqueueRequest(requestEntry);
58+
} else {
59+
final ListenableFutureImpl listenableFutureImpl = new ListenableFutureImpl();
60+
61+
listenableFutureImpl.fail(ResponseFailEntry.builder()
62+
.withCode("000")
63+
.withId(requestEntry.getId())
64+
.withMessage(String.format("Producer is currently in %s mode; no further messages will be accepted.", state.get().name()))
65+
.withSenderFault(true)
66+
.build()
67+
);
68+
69+
return listenableFutureImpl;
70+
}
6471
}
6572

6673
/**
67-
* Shuts down the producer's executor service gracefully, waiting up to 60 seconds
68-
* for termination.
74+
* Transitions the producer to the shutdown state. No further messages will be
75+
* accepted once shutdown.
6976
*/
70-
@SneakyThrows
7177
public void shutdown() {
72-
LOGGER.warn("Shutdown producer {}", getClass().getSimpleName());
73-
74-
executorService.shutdown();
75-
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
76-
LOGGER.warn("Executor service did not terminate in the specified time.");
77-
final List<Runnable> droppedTasks = executorService.shutdownNow();
78-
LOGGER.warn("Executor service was abruptly shut down. {} tasks will not be executed.", droppedTasks.size());
79-
}
78+
state.compareAndSet(State.RUNNIG, State.SHUTDOWN);
8079
}
8180

8281
/**
@@ -94,5 +93,12 @@ private ListenableFuture<ResponseSuccessEntry, ResponseFailEntry> enqueueRequest
9493
return trackPendingRequest;
9594
}
9695

96+
/**
97+
* Lifecycle states of the producer.
98+
*/
99+
enum State {
100+
RUNNIG, SHUTDOWN
101+
}
102+
97103
}
98104
// @formatter:on

0 commit comments

Comments
 (0)