Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@
* is thrown.
*/
public class BlockingSubmissionPolicy implements RejectedExecutionHandler {


/** The maximum time to wait for queue insertion, in milliseconds. */
private final long timeout;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,22 +42,31 @@
*/
public class RingBufferBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E> {

/** Default capacity when no explicit capacity is provided. */
private static final int DEFAULT_CAPACITY = 2048;

/** The ring buffer array holding queue entries. */
private final AtomicReferenceArray<Entry<E>> buffer;

/** The fixed maximum number of elements the queue can hold. */
private final int capacity;

/** Sequence number tracking the next write position (starts at -1 indicating no writes). */
private final AtomicLong writeSequence = new AtomicLong(-1);

/** Sequence number tracking the next read position. */
private final AtomicLong readSequence = new AtomicLong(0);

/** Current number of elements in the queue. */
private final AtomicInteger size = new AtomicInteger(0);

/** Fair reentrant lock for coordinating producer/consumer access. */
private final ReentrantLock reentrantLock;

/** Condition for consumers waiting when the queue is empty. */
private final Condition waitingConsumer;

/** Condition for producers waiting when the queue is full. */
private final Condition waitingProducer;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class ThreadFactoryProvider {

/** Class logger. */
private static final Logger LOGGER = LoggerFactory.getLogger(ThreadFactoryProvider.class);


/** Cached supplier of the appropriate thread factory for the runtime Java version. */
private static Supplier<ThreadFactory> supplierThreadFactory;

static {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
* @param <O> the publish batch result type
* @param <E> the request entry payload type
*/
abstract class AbstractAmazonSnsConsumer<C, R, O, E> implements Runnable {
abstract class AbstractAmazonSnsConsumer<C, R, O, E> implements Runnable, AmazonSnsConsumer<R, O> {

/**
* Kilobyte constant used for size calculations.
Expand All @@ -75,22 +75,31 @@ abstract class AbstractAmazonSnsConsumer<C, R, O, E> implements Runnable {
*/
private static final Integer BATCH_SIZE_BYTES_THRESHOLD = 256 * AbstractAmazonSnsConsumer.KB;

/** Class logger. */
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractAmazonSnsConsumer.class);

/** Single-thread scheduler that periodically triggers batch draining. */
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(ThreadFactoryProvider.getThreadFactory());

/** The Amazon SNS client used for publishing batches. */
protected final C amazonSnsClient;

/** The topic configuration properties. */
private final TopicProperty topicProperty;

/** Factory for creating internal request entry representations. */
private final RequestEntryInternalFactory requestEntryInternalFactory;

/** Shared map of pending requests keyed by request ID for async completion. */
protected final ConcurrentMap<String, ListenableFuture<ResponseSuccessEntry, ResponseFailEntry>> pendingRequests;

/** The blocking queue that buffers incoming topic requests. */
private final BlockingQueue<RequestEntry<E>> topicRequests;

/** Optional decorator applied to the publish batch request before sending. */
private final UnaryOperator<R> publishDecorator;

/** Executor service for asynchronous (non-FIFO) publishing. */
private final ExecutorService executorService;

/**
Expand Down Expand Up @@ -124,29 +133,6 @@ protected AbstractAmazonSnsConsumer(
scheduledExecutorService.scheduleAtFixedRate(this, 0, topicProperty.getLinger(), TimeUnit.MILLISECONDS);
}

/**
* Publishes a batch request to Amazon SNS.
*
* @param publishBatchRequest the batch request to publish
* @return the publish result
*/
protected abstract O publish(final R publishBatchRequest);

/**
* Handles an error that occurred during publishing.
*
* @param publishBatchRequest the batch request that failed
* @param throwable the exception that was thrown
*/
protected abstract void handleError(final R publishBatchRequest, final Throwable throwable);

/**
* Handles the response from a successful publish call.
*
* @param publishBatchResult the result of the publish operation
*/
protected abstract void handleResponse(final O publishBatchResult);

/**
* Returns a factory function that creates a publish batch request from a topic ARN
* and a list of internal request entries.
Expand Down Expand Up @@ -206,6 +192,7 @@ public void run() {
* Shuts down the consumer, waiting up to 60 seconds for both the scheduled and
* worker executor services to terminate.
*/
@Override
@SneakyThrows
public void shutdown() {
await().thenRun(() -> {
Expand Down Expand Up @@ -340,6 +327,7 @@ private Optional<R> createBatch(final BlockingQueue<RequestEntry<E>> requests) {
*
* @return a future that completes when all requests are drained
*/
@Override
@SneakyThrows
public CompletableFuture<Void> await() {
return CompletableFuture.runAsync(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,15 @@
* @param <E> the request entry payload type
*/
@RequiredArgsConstructor(access = AccessLevel.PROTECTED)
abstract class AbstractAmazonSnsProducer<E> {
abstract class AbstractAmazonSnsProducer<E> implements AmazonSnsProducer<E> {

/** The producer lifecycle state, initially {@link State#RUNNIG}. */
private final AtomicReference<State> state = new AtomicReference<>(State.RUNNIG);

/** Map of pending requests keyed by request ID for asynchronous completion. */
private final ConcurrentMap<String, ListenableFuture<ResponseSuccessEntry, ResponseFailEntry>> pendingRequests;

/** The blocking queue for buffering requests before batch processing. */
private final BlockingQueue<RequestEntry<E>> topicRequests;

/**
Expand All @@ -51,6 +54,7 @@ abstract class AbstractAmazonSnsProducer<E> {
* @param requestEntry the request to enqueue
* @return a {@link ListenableFuture} that tracks the completion of this request
*/
@Override
@SneakyThrows
public ListenableFuture<ResponseSuccessEntry, ResponseFailEntry> send(final RequestEntry<E> requestEntry) {
if (State.RUNNIG.equals(state.get())) {
Expand All @@ -74,6 +78,7 @@ public ListenableFuture<ResponseSuccessEntry, ResponseFailEntry> send(final Requ
* Transitions the producer to the shutdown state. No further messages will be
* accepted once shutdown.
*/
@Override
public void shutdown() {
state.compareAndSet(State.RUNNIG, State.SHUTDOWN);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,16 @@
* sending messages, shutting down, and awaiting completion. Delegates to a producer
* and consumer for actual processing.
*
* @param <C> the Amazon SNS client type
* @param <R> the publish batch request type
* @param <O> the publish batch result type
* @param <E> the request entry payload type
*/
@RequiredArgsConstructor(access = AccessLevel.PROTECTED)
abstract class AbstractAmazonSnsTemplate<C, R, O, E> {
abstract class AbstractAmazonSnsTemplate<R, O, E> {

private final AbstractAmazonSnsProducer<E> amazonSnsProducer;
private final AmazonSnsProducer<E> amazonSnsProducer;

private final AbstractAmazonSnsConsumer<C, R, O, E> amazonSnsConsumer;
private final AmazonSnsConsumer<R, O> amazonSnsConsumer;

/**
* Sends a request entry to the SNS topic asynchronously.
Expand Down Expand Up @@ -108,7 +107,7 @@ protected static ExecutorService getExecutorService(final TopicProperty topicPro
}

@Getter
public static final class Builder<C, R, O, E, T extends AbstractAmazonSnsTemplate<C, R, O, E>> {
public static final class Builder<C, R, O, E, T extends AbstractAmazonSnsTemplate<R, O, E>> {

/**
* The Amazon SNS client used for publishing.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.amazon.sns.messaging.lib.core;

import java.util.concurrent.CompletableFuture;

/**
* Consumer interface for Amazon SNS messaging. Implementations handle batch publishing
* of requests and dispatching of responses or errors to pending request futures.
*
* @param <R> the publish batch request type
* @param <O> the publish batch result type
*/
public interface AmazonSnsConsumer<R, O> {

/**
* Publishes a batch request to Amazon SNS.
*
* @param publishBatchRequest the batch request to publish
* @return the publish result
*/
public abstract O publish(final R publishBatchRequest);

/**
* Handles an error that occurred during publishing.
*
* @param publishBatchRequest the batch request that failed
* @param throwable the exception that was thrown
*/
public void handleError(final R publishBatchRequest, final Throwable throwable);

/**
* Handles the response from a successful publish call.
*
* @param publishBatchResult the result of the publish operation
*/
public void handleResponse(final O publishBatchResult);

/**
* Shuts down the consumer, waiting up to 60 seconds for both the scheduled and
* worker executor services to terminate.
*/
public void shutdown();

/**
* Returns a {@link CompletableFuture} that completes once all pending requests
* have been processed (i.e., both the pending requests map and the topic
* requests queue are empty).
*
* @return a future that completes when all requests are drained
*/
public CompletableFuture<Void> await();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.amazon.sns.messaging.lib.core;

import com.amazon.sns.messaging.lib.model.RequestEntry;
import com.amazon.sns.messaging.lib.model.ResponseFailEntry;
import com.amazon.sns.messaging.lib.model.ResponseSuccessEntry;

/**
* Producer interface for Amazon SNS messaging. Implementations enqueue request entries
* for batch publishing and track pending requests for asynchronous completion.
*
* @param <E> the request entry payload type
*/
public interface AmazonSnsProducer<E> {

/**
* Sends a request entry for asynchronous publishing to an SNS topic.
*
* @param requestEntry the request entry containing the message payload and metadata
* @return a {@link ListenableFuture} that completes when the request is processed
*/
public ListenableFuture<ResponseSuccessEntry, ResponseFailEntry> send(final RequestEntry<E> requestEntry);

/**
* Shuts down the producer, preventing any further messages from being accepted.
*/
public void shutdown();

}
// @formatter:on
Original file line number Diff line number Diff line change
Expand Up @@ -116,19 +116,26 @@ public Integer messageAttributesSize(final RequestEntry<?> requestEntry) {
@Builder(setterPrefix = "with")
static class RequestEntryInternal {

/** The creation timestamp in nanoseconds. */
private final long createTime;

/** The unique identifier of the request. */
private final String id;

/** The serialized payload as a byte buffer. */
@Getter(value = AccessLevel.PRIVATE)
private final ByteBuffer value;

/** Optional message attributes / headers. */
private final Map<String, Object> messageHeaders;

/** Optional subject line for the message. */
private final String subject;

/** The message group ID for FIFO topics. */
private final String groupId;

/** The message deduplication ID for FIFO topics. */
private final String deduplicationId;

/**
Expand Down
Loading