Skip to content

Commit 9201b04

Browse files
authored
Merge pull request #140 from mvallim/feature/test-container
Auto-created pull request into `develop` from `feature/test-container`
2 parents 999007c + a2b2b40 commit 9201b04

27 files changed

Lines changed: 1522 additions & 86 deletions

amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/BlockingSubmissionPolicy.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@
3030
* is thrown.
3131
*/
3232
public class BlockingSubmissionPolicy implements RejectedExecutionHandler {
33-
33+
34+
/** The maximum time to wait for queue insertion, in milliseconds. */
3435
private final long timeout;
3536

3637
/**

amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/RingBufferBlockingQueue.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,22 +42,31 @@
4242
*/
4343
public class RingBufferBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E> {
4444

45+
/** Default capacity when no explicit capacity is provided. */
4546
private static final int DEFAULT_CAPACITY = 2048;
4647

48+
/** The ring buffer array holding queue entries. */
4749
private final AtomicReferenceArray<Entry<E>> buffer;
4850

51+
/** The fixed maximum number of elements the queue can hold. */
4952
private final int capacity;
5053

54+
/** Sequence number tracking the next write position (starts at -1 indicating no writes). */
5155
private final AtomicLong writeSequence = new AtomicLong(-1);
5256

57+
/** Sequence number tracking the next read position. */
5358
private final AtomicLong readSequence = new AtomicLong(0);
5459

60+
/** Current number of elements in the queue. */
5561
private final AtomicInteger size = new AtomicInteger(0);
5662

63+
/** Fair reentrant lock for coordinating producer/consumer access. */
5764
private final ReentrantLock reentrantLock;
5865

66+
/** Condition for consumers waiting when the queue is empty. */
5967
private final Condition waitingConsumer;
6068

69+
/** Condition for producers waiting when the queue is full. */
6170
private final Condition waitingProducer;
6271

6372
/**

amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/ThreadFactoryProvider.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,10 @@
3535
@NoArgsConstructor(access = AccessLevel.PRIVATE)
3636
public final class ThreadFactoryProvider {
3737

38+
/** Class logger. */
3839
private static final Logger LOGGER = LoggerFactory.getLogger(ThreadFactoryProvider.class);
39-
40+
41+
/** Cached supplier of the appropriate thread factory for the runtime Java version. */
4042
private static Supplier<ThreadFactory> supplierThreadFactory;
4143

4244
static {

amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsConsumer.java

Lines changed: 12 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@
6363
* @param <O> the publish batch result type
6464
* @param <E> the request entry payload type
6565
*/
66-
abstract class AbstractAmazonSnsConsumer<C, R, O, E> implements Runnable {
66+
abstract class AbstractAmazonSnsConsumer<C, R, O, E> implements Runnable, AmazonSnsConsumer<R, O> {
6767

6868
/**
6969
* Kilobyte constant used for size calculations.
@@ -75,22 +75,31 @@ abstract class AbstractAmazonSnsConsumer<C, R, O, E> implements Runnable {
7575
*/
7676
private static final Integer BATCH_SIZE_BYTES_THRESHOLD = 256 * AbstractAmazonSnsConsumer.KB;
7777

78+
/** Class logger. */
7879
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractAmazonSnsConsumer.class);
7980

81+
/** Single-thread scheduler that periodically triggers batch draining. */
8082
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(ThreadFactoryProvider.getThreadFactory());
8183

84+
/** The Amazon SNS client used for publishing batches. */
8285
protected final C amazonSnsClient;
8386

87+
/** The topic configuration properties. */
8488
private final TopicProperty topicProperty;
8589

90+
/** Factory for creating internal request entry representations. */
8691
private final RequestEntryInternalFactory requestEntryInternalFactory;
8792

93+
/** Shared map of pending requests keyed by request ID for async completion. */
8894
protected final ConcurrentMap<String, ListenableFuture<ResponseSuccessEntry, ResponseFailEntry>> pendingRequests;
8995

96+
/** The blocking queue that buffers incoming topic requests. */
9097
private final BlockingQueue<RequestEntry<E>> topicRequests;
9198

99+
/** Optional decorator applied to the publish batch request before sending. */
92100
private final UnaryOperator<R> publishDecorator;
93101

102+
/** Executor service for asynchronous (non-FIFO) publishing. */
94103
private final ExecutorService executorService;
95104

96105
/**
@@ -124,29 +133,6 @@ protected AbstractAmazonSnsConsumer(
124133
scheduledExecutorService.scheduleAtFixedRate(this, 0, topicProperty.getLinger(), TimeUnit.MILLISECONDS);
125134
}
126135

127-
/**
128-
* Publishes a batch request to Amazon SNS.
129-
*
130-
* @param publishBatchRequest the batch request to publish
131-
* @return the publish result
132-
*/
133-
protected abstract O publish(final R publishBatchRequest);
134-
135-
/**
136-
* Handles an error that occurred during publishing.
137-
*
138-
* @param publishBatchRequest the batch request that failed
139-
* @param throwable the exception that was thrown
140-
*/
141-
protected abstract void handleError(final R publishBatchRequest, final Throwable throwable);
142-
143-
/**
144-
* Handles the response from a successful publish call.
145-
*
146-
* @param publishBatchResult the result of the publish operation
147-
*/
148-
protected abstract void handleResponse(final O publishBatchResult);
149-
150136
/**
151137
* Returns a factory function that creates a publish batch request from a topic ARN
152138
* and a list of internal request entries.
@@ -206,6 +192,7 @@ public void run() {
206192
* Shuts down the consumer, waiting up to 60 seconds for both the scheduled and
207193
* worker executor services to terminate.
208194
*/
195+
@Override
209196
@SneakyThrows
210197
public void shutdown() {
211198
await().thenRun(() -> {
@@ -340,6 +327,7 @@ private Optional<R> createBatch(final BlockingQueue<RequestEntry<E>> requests) {
340327
*
341328
* @return a future that completes when all requests are drained
342329
*/
330+
@Override
343331
@SneakyThrows
344332
public CompletableFuture<Void> await() {
345333
return CompletableFuture.runAsync(() -> {

amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsProducer.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,15 @@
3737
* @param <E> the request entry payload type
3838
*/
3939
@RequiredArgsConstructor(access = AccessLevel.PROTECTED)
40-
abstract class AbstractAmazonSnsProducer<E> {
40+
abstract class AbstractAmazonSnsProducer<E> implements AmazonSnsProducer<E> {
4141

42+
/** The producer lifecycle state, initially {@link State#RUNNIG}. */
4243
private final AtomicReference<State> state = new AtomicReference<>(State.RUNNIG);
4344

45+
/** Map of pending requests keyed by request ID for asynchronous completion. */
4446
private final ConcurrentMap<String, ListenableFuture<ResponseSuccessEntry, ResponseFailEntry>> pendingRequests;
4547

48+
/** The blocking queue for buffering requests before batch processing. */
4649
private final BlockingQueue<RequestEntry<E>> topicRequests;
4750

4851
/**
@@ -51,6 +54,7 @@ abstract class AbstractAmazonSnsProducer<E> {
5154
* @param requestEntry the request to enqueue
5255
* @return a {@link ListenableFuture} that tracks the completion of this request
5356
*/
57+
@Override
5458
@SneakyThrows
5559
public ListenableFuture<ResponseSuccessEntry, ResponseFailEntry> send(final RequestEntry<E> requestEntry) {
5660
if (State.RUNNIG.equals(state.get())) {
@@ -74,6 +78,7 @@ public ListenableFuture<ResponseSuccessEntry, ResponseFailEntry> send(final Requ
7478
* Transitions the producer to the shutdown state. No further messages will be
7579
* accepted once shutdown.
7680
*/
81+
@Override
7782
public void shutdown() {
7883
state.compareAndSet(State.RUNNIG, State.SHUTDOWN);
7984
}

amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsTemplate.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,17 +47,16 @@
4747
* sending messages, shutting down, and awaiting completion. Delegates to a producer
4848
* and consumer for actual processing.
4949
*
50-
* @param <C> the Amazon SNS client type
5150
* @param <R> the publish batch request type
5251
* @param <O> the publish batch result type
5352
* @param <E> the request entry payload type
5453
*/
5554
@RequiredArgsConstructor(access = AccessLevel.PROTECTED)
56-
abstract class AbstractAmazonSnsTemplate<C, R, O, E> {
55+
abstract class AbstractAmazonSnsTemplate<R, O, E> {
5756

58-
private final AbstractAmazonSnsProducer<E> amazonSnsProducer;
57+
private final AmazonSnsProducer<E> amazonSnsProducer;
5958

60-
private final AbstractAmazonSnsConsumer<C, R, O, E> amazonSnsConsumer;
59+
private final AmazonSnsConsumer<R, O> amazonSnsConsumer;
6160

6261
/**
6362
* Sends a request entry to the SNS topic asynchronously.
@@ -108,7 +107,7 @@ protected static ExecutorService getExecutorService(final TopicProperty topicPro
108107
}
109108

110109
@Getter
111-
public static final class Builder<C, R, O, E, T extends AbstractAmazonSnsTemplate<C, R, O, E>> {
110+
public static final class Builder<C, R, O, E, T extends AbstractAmazonSnsTemplate<R, O, E>> {
112111

113112
/**
114113
* The Amazon SNS client used for publishing.
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package com.amazon.sns.messaging.lib.core;
2+
3+
import java.util.concurrent.CompletableFuture;
4+
5+
/**
6+
* Consumer interface for Amazon SNS messaging. Implementations handle batch publishing
7+
* of requests and dispatching of responses or errors to pending request futures.
8+
*
9+
* @param <R> the publish batch request type
10+
* @param <O> the publish batch result type
11+
*/
12+
public interface AmazonSnsConsumer<R, O> {
13+
14+
/**
15+
* Publishes a batch request to Amazon SNS.
16+
*
17+
* @param publishBatchRequest the batch request to publish
18+
* @return the publish result
19+
*/
20+
public abstract O publish(final R publishBatchRequest);
21+
22+
/**
23+
* Handles an error that occurred during publishing.
24+
*
25+
* @param publishBatchRequest the batch request that failed
26+
* @param throwable the exception that was thrown
27+
*/
28+
public void handleError(final R publishBatchRequest, final Throwable throwable);
29+
30+
/**
31+
* Handles the response from a successful publish call.
32+
*
33+
* @param publishBatchResult the result of the publish operation
34+
*/
35+
public void handleResponse(final O publishBatchResult);
36+
37+
/**
38+
* Shuts down the consumer, waiting up to 60 seconds for both the scheduled and
39+
* worker executor services to terminate.
40+
*/
41+
public void shutdown();
42+
43+
/**
44+
* Returns a {@link CompletableFuture} that completes once all pending requests
45+
* have been processed (i.e., both the pending requests map and the topic
46+
* requests queue are empty).
47+
*
48+
* @return a future that completes when all requests are drained
49+
*/
50+
public CompletableFuture<Void> await();
51+
52+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright 2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.amazon.sns.messaging.lib.core;
18+
19+
import com.amazon.sns.messaging.lib.model.RequestEntry;
20+
import com.amazon.sns.messaging.lib.model.ResponseFailEntry;
21+
import com.amazon.sns.messaging.lib.model.ResponseSuccessEntry;
22+
23+
/**
24+
* Producer interface for Amazon SNS messaging. Implementations enqueue request entries
25+
* for batch publishing and track pending requests for asynchronous completion.
26+
*
27+
* @param <E> the request entry payload type
28+
*/
29+
public interface AmazonSnsProducer<E> {
30+
31+
/**
32+
* Sends a request entry for asynchronous publishing to an SNS topic.
33+
*
34+
* @param requestEntry the request entry containing the message payload and metadata
35+
* @return a {@link ListenableFuture} that completes when the request is processed
36+
*/
37+
public ListenableFuture<ResponseSuccessEntry, ResponseFailEntry> send(final RequestEntry<E> requestEntry);
38+
39+
/**
40+
* Shuts down the producer, preventing any further messages from being accepted.
41+
*/
42+
public void shutdown();
43+
44+
}
45+
// @formatter:on

amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/RequestEntryInternalFactory.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,19 +116,26 @@ public Integer messageAttributesSize(final RequestEntry<?> requestEntry) {
116116
@Builder(setterPrefix = "with")
117117
static class RequestEntryInternal {
118118

119+
/** The creation timestamp in nanoseconds. */
119120
private final long createTime;
120121

122+
/** The unique identifier of the request. */
121123
private final String id;
122124

125+
/** The serialized payload as a byte buffer. */
123126
@Getter(value = AccessLevel.PRIVATE)
124127
private final ByteBuffer value;
125128

129+
/** Optional message attributes / headers. */
126130
private final Map<String, Object> messageHeaders;
127131

132+
/** Optional subject line for the message. */
128133
private final String subject;
129134

135+
/** The message group ID for FIFO topics. */
130136
private final String groupId;
131137

138+
/** The message deduplication ID for FIFO topics. */
132139
private final String deduplicationId;
133140

134141
/**

0 commit comments

Comments
 (0)