diff --git a/.opencode/skills/documentation/SKILL.md b/.opencode/skills/documentation/SKILL.md new file mode 100644 index 0000000..bf01682 --- /dev/null +++ b/.opencode/skills/documentation/SKILL.md @@ -0,0 +1,18 @@ +--- +name: "generate-markdown-docs" +description: "Skill that analyzes code and generates technical documentation in Markdown." +scope: "workspace" +commands: + - "generate docs" + - "document project" +author: "mvallim" +version: "1.0.0" +--- + +## Objective +Analyze the codebase and generate README.md files or technical guides. + +## Instructions +- Use hierarchical headings (#, ##, ###). +- List dependencies in code blocks. +- Include usage examples. \ No newline at end of file diff --git a/GUIDE.md b/GUIDE.md new file mode 100644 index 0000000..4c7f0fb --- /dev/null +++ b/GUIDE.md @@ -0,0 +1,286 @@ +# Amazon SNS Java Messaging Library — Technical Guide + +## Architecture Overview + +The library provides an asynchronous, batched messaging client for Amazon SNS, supporting both AWS SDK v1 and v2. It is organized as a multi-module Maven project: + +| Module | Artifact | Purpose | +|------------------------------------------|------------------------------------|----------------------------------------------------------| +| `amazon-sns-java-messaging-lib-template` | *(internal)* | SDK-agnostic core: batching, queuing, threading, metrics | +| `amazon-sns-java-messaging-lib-v1` | `amazon-sns-java-messaging-lib-v1` | AWS SDK v1 implementation (`AmazonSNS` client) | +| `amazon-sns-java-messaging-lib-v2` | `amazon-sns-java-messaging-lib-v2` | AWS SDK v2 implementation (`SnsClient`) | + +### Core Components + +```text +┌────────────────────────────────────────────────────────────┐ +│ AmazonSnsTemplate │ +├────────────────────────────────────────────────────────────┤ +│ ┌───────────────────────┐ ┌─────────────────────────┐ │ +│ │ AmazonSnsProducer │ │ AmazonSnsConsumer │ │ +│ │ (AbstractProducer) │ │ (AbstractConsumer) │ │ +│ │ │ │ │ │ +│ │ - BlockingQueue │ │ - ScheduledExecutor │ │ +│ │ - PendingRequests │ │ - Batching Logic │ │ +│ └──────────┬────────────┘ └───────────┬─────────────┘ │ +│ │ │ │ +│ send(E) publishBatch(...) │ +└─────────────┼─────────────────────────────┼────────────────┘ + │ │ + ▼ ▼ + ┌───────────────────────────────────────────┐ + │ Amazon SNS (v1/v2) │ + └───────────────────────────────────────────┘ +``` + +- **`AmazonSnsTemplate`** — Main entry point. Created via a fluent builder (`AmazonSnsTemplate.builder(snsClient, topicProperty)`). +- **`AmazonSnsProducer`** — Accepts messages into a `BlockingQueue`, tracks pending futures, and returns `ListenableFuture` results. +- **`AmazonSnsConsumer`** — Scheduled drainer that pulls messages from the queue at `linger` intervals, batches them (respecting count and 256KB size limits), and publishes via the SDK's `publishBatch` API. + +--- + +## Batching Behavior + +Messages are accumulated in a `BlockingQueue` and drained periodically by a scheduled executor. + +- **Linger**: Time (ms) to wait before flushing the batch. Resets on each new message arrival. +- **Max batch size**: Maximum number of messages per `publishBatch` call. +- **256KB limit**: Each batch request must not exceed the SNS payload limit. Messages exceeding 256KB individually throw `MaximumAllowedMessageException`. +- **Memory**: The buffer stores up to `maximumPoolSize × maxBatchSize` messages internally (backed by the `BlockingQueue`). + +For **FIFO** topics, messages are published **synchronously** on a single-threaded executor to preserve ordering. For **standard** topics, publishing is **asynchronous** via a multi-threaded executor. + +--- + +## Message Flow + +1. User calls `template.send(RequestEntry)` +2. Producer serializes the message payload to JSON (via Jackson `ObjectMapper`) +3. Producer enqueues the serialized entry into a `BlockingQueue` and registers a `ListenableFuture` +4. Consumer's scheduled task drains the queue at `linger` intervals, building a `PublishBatchRequest` +5. Consumer calls `publishBatch()` on the SNS client +6. On success: individual `ResponseSuccessEntry` results are matched back to futures by message ID +7. On failure: `ResponseFailEntry` objects complete the corresponding futures with error details + +--- + +## Dependencies + +### Template Module (shared) + +```xml + + org.slf4j:slf4j-api:2.0.6 + org.apache.commons:commons-collections4:4.5.0 + org.apache.commons:commons-lang3:3.20.0 + com.fasterxml.jackson.core:jackson-databind:2.16.1 + io.micrometer:micrometer-core:1.16.3 + org.projectlombok:lombok:1.18.42 (provided) + +``` + +### AWS SDK v1 Module + +```xml + + com.amazonaws + aws-java-sdk-sns + 1.12.661 + +``` + +### AWS SDK v2 Module + +```xml + + software.amazon.awssdk + sns + 2.20.162 + +``` + +### Test Dependencies + +```xml +org.junit.jupiter:junit-jupiter:5.10.2 (test) +org.mockito:mockito-core:4.11.0 (test) +org.awaitility:awaitility:4.2.2 (test) +org.assertj:assertj-core:3.24.2 (test) +org.testcontainers:localstack:1.20.4 (test) +``` + +--- + +## Configuration Reference + +### TopicProperty + +| Property | Type | Default | Description | +|-------------------|-------------|---------|--------------------------------------| +| `fifo` | `boolean` | `false` | Whether the SNS topic is FIFO | +| `topicArn` | `String` | — | The SNS topic ARN | +| `maximumPoolSize` | `int` | — | Max threads for the producer pool | +| `maxBatchSize` | `int` | — | Max messages per batch request | +| `linger` | `long` (ms) | — | Time to wait before flushing a batch | + +**Note**: The in-memory buffer size = `maximumPoolSize × maxBatchSize`. Large values consume proportionally more memory. + +--- + +## Usage Examples + +### 1. Setup with Builder (Recommended) + +```java +// For AWS SDK v1 — AmazonSNS client +// For AWS SDK v2 — SnsClient + +TopicProperty topicProperty = TopicProperty.builder() + .fifo(false) + .linger(100L) + .maxBatchSize(10) + .maximumPoolSize(5) + .topicArn("arn:aws:sns:us-east-2:000000000000:topic") + .build(); + +AmazonSnsTemplate template = AmazonSnsTemplate.builder(snsClient, topicProperty) + .meterRegistry(new SimpleMeterRegistry()) + .topicRequests(new RingBufferBlockingQueue<>(1024)) + .build(); +``` + +### 2. Sending a Standard Message + +```java +template.send( + RequestEntry.builder() + .withValue(new MyMessage("hello")) + .withMessageHeaders(Map.of("source", "app-1")) + .build() +); +``` + +### 3. Sending a FIFO Message + +```java +template.send( + RequestEntry.builder() + .withValue(new MyMessage("ordered-msg")) + .withGroupId("my-group-id") + .withDeduplicationId(UUID.randomUUID().toString()) + .build() +); +``` + +### 4. Async Callbacks + +```java +template.send(requestEntry) + .addCallback( + success -> log.info("Sent: {}", success.getMessageId()), + failure -> log.error("Failed: {} [{}]", failure.getMessage(), failure.getCode()) + ); +``` + +### 5. Await Completion and Shutdown + +```java +template.send(requestEntry); +template.await().thenRun(template::shutdown).join(); +``` + +### 6. Custom ObjectMapper and BlockingQueue + +```java +AmazonSnsTemplate template = new AmazonSnsTemplate<>( + amazonSNS, + topicProperty, + new LinkedBlockingQueue<>(100), + new ObjectMapper() +); +``` + +--- + +## Metrics (Micrometer) + +The library integrates with Micrometer and records the following metrics when a `MeterRegistry` is provided via the builder's `.meterRegistry(registry)`. + +### SNS Publish Metrics + +Tags: `topic` = `` + +| Metric Name | Type | Description | Config | +|--------------------------|-----------------------|----------------------------------------------------|------------------------------------------| +| `sns.publish.attempts` | `Counter` | Total number of SNS PublishBatch calls attempted | — | +| `sns.publish.success` | `Counter` | Individual SNS messages acknowledged as successful | — | +| `sns.publish.failure` | `Counter` | Individual SNS messages that failed | Dynamic tags: `error_code`, `error_type` | +| `sns.publish.duration` | `Timer` | End-to-end latency of SNS PublishBatch calls | Percentiles: 0.5, 0.95, 0.99 | +| `sns.publish.batch.size` | `DistributionSummary` | Number of entries per SNS PublishBatch request | — | +| `sns.publish.inflight` | `Gauge` | PublishBatches currently in progress | Backed by `AtomicInteger` | + +The `sns.publish.failure` counter is created dynamically with additional `error_code` (AWS error code string) and `error_type` (amazon_service_exception or unknown) tags. + +### Blocking Queue Metrics + +Tags: `name` = `` + +| Metric Name | Type | Description | Config | +|--------------------------------|-----------|----------------------------------------------------------------------|-------------------------| +| `blocking.queue.puts.total` | `Counter` | Total number of successful put operations | — | +| `blocking.queue.puts.failed` | `Counter` | Total number of put operations that threw an exception | — | +| `blocking.queue.put.duration` | `Timer` | Latency of put operations (including wait time when queue is full) | Percentile histogram | +| `blocking.queue.takes.total` | `Counter` | Total number of successful take operations | — | +| `blocking.queue.takes.failed` | `Counter` | Total number of take operations that threw an exception | — | +| `blocking.queue.take.duration` | `Timer` | Latency of take operations (including wait time when queue is empty) | Percentile histogram | +| `blocking.queue.size` | `Gauge` | Current number of elements in the queue | Calls `delegate.size()` | + +### Executor Metrics + +Tags: `name` = `` + +| Metric Name | Type | Description | Config | +|----------------------------|-----------|--------------------------------------------------------------------|---------------------------| +| `executor.active` | `Gauge` | Number of tasks currently being executed by pool threads | Backed by `AtomicInteger` | +| `executor.tasks.succeeded` | `Counter` | Total number of tasks that completed without throwing an exception | — | +| `executor.tasks.failed` | `Counter` | Total number of tasks that completed by throwing an exception | — | +| `executor.task.duration` | `Timer` | Wall-clock duration of each task execution | — | + +--- + +## Threading Model + +- **Standard topics**: Uses `AmazonSnsThreadPoolExecutor` with `SynchronousQueue`, zero core threads, and `BlockingSubmissionPolicy` (30s blocking timeout). Threads are created on demand. +- **FIFO topics**: Single-threaded executor to guarantee message ordering. +- **Consumer scheduler**: A `ScheduledExecutorService` with a single daemon thread drains the queue at each `linger` interval. + +--- + +## Exception Handling + +| Exception | Condition | +|-------------------------------------------------------------------|------------------------------------------------------| +| `MaximumAllowedMessageException` | A single message exceeds the 256KB SNS payload limit | +| SDK exceptions (`AmazonServiceException` / `AwsServiceException`) | Service-side errors during `publishBatch` | +| SDK exceptions (`AmazonClientException` / `AwsClientException`) | Client-side errors (network, serialization) | + +Failed messages are delivered to the failure callback with: + +- `messageId` — the original request ID +- `code` — the error code +- `message` — error description +- `senderFault` — whether the error is a client or server fault + +--- + +## Testing + +```bash +# Run unit tests +mvn test + +# Run integration tests (requires Docker for LocalStack) +mvn verify -P integration-test +``` + +The integration tests use [Testcontainers](https://testcontainers.com) with `localstack/localstack:3.4.0` to spin up real SNS and SQS services. Messages are verified by subscribing an SQS queue to the SNS topic and polling for delivery. diff --git a/README.md b/README.md index 350b9d7..8cd452b 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,9 @@ [![Maven Central](https://img.shields.io/maven-central/v/com.github.mvallim/amazon-sns-java-messaging-lib)](https://img.shields.io/maven-central/v/com.github.mvallim/amazon-sns-java-messaging-lib) [![Hex.pm](https://img.shields.io/hexpm/l/plug.svg)](http://www.apache.org/licenses/LICENSE-2.0) -The Amazon SNS Java Messaging Library holds the compatible classes, that are used for communicating with Amazon Simple Notification Service. This project builds on top of the AWS SDK for Java to use Amazon SNS provider for the messaging applications without running any additional software. +The Amazon SNS Java Messaging Library provides an asynchronous, batched messaging client for Amazon SNS, supporting both AWS SDK v1 (`AmazonSNS`) and v2 (`SnsClient`). It features configurable batching with linger time, FIFO ordering, message attributes, and Micrometer metrics. + +> For detailed architecture, threading model, batching behavior, and exception handling, see the [Technical Guide](GUIDE.md). > The batch size should be chosen based on the size of individual messages and available network bandwidth as well as the observed latency and throughput improvements based on the real life load. These are configured to some sensible defaults assuming smaller message sizes and the optimal batch size for server side processing. @@ -32,13 +34,15 @@ In order to use Amazon SNS Java Messaging Lib within a Maven project, simply add You can pull it from the central Maven repositories: +#### Maven + ### For AWS SDK v1 ```xml com.github.mvallim amazon-sns-java-messaging-lib-v1 - 1.2.0 + 1.3.0 ``` @@ -48,7 +52,7 @@ You can pull it from the central Maven repositories: com.github.mvallim amazon-sns-java-messaging-lib-v2 - 1.2.0 + 1.3.0 ``` @@ -70,13 +74,13 @@ If you want to try a snapshot version, add the following repository: ### For AWS SDK v1 ```groovy -implementation 'com.github.mvallim:amazon-sns-java-messaging-lib-v1:1.2.0' +implementation 'com.github.mvallim:amazon-sns-java-messaging-lib-v1:1.3.0' ``` ### For AWS SDK v2 ```groovy -implementation 'com.github.mvallim:amazon-sns-java-messaging-lib-v2:1.2.0' +implementation 'com.github.mvallim:amazon-sns-java-messaging-lib-v2:1.3.0' ``` If you want to try a snapshot version, add the following repository: @@ -103,7 +107,23 @@ repositories { **NOTICE**: the buffer of message store in memory is calculate using **`maximumPoolSize`** * **`maxBatchSize`** huge values demand huge memory. -#### Determining the type of `BlockingQueue` with its maximum capacity +#### Custom `BlockingQueue` + +```java +final TopicProperty topicProperty = TopicProperty.builder() + .fifo(false) + .linger(100) + .maxBatchSize(10) + .maximumPoolSize(20) + .topicArn("arn:aws:sns:us-east-2:000000000000:topic") + .build(); + +AmazonSnsTemplate snsTemplate = AmazonSnsTemplate.builder(amazonSNS, topicProperty) + .topicRequests(new LinkedBlockingQueue<>(100)) + .build(); +``` + +#### Custom `ObjectMapper` ```java final TopicProperty topicProperty = TopicProperty.builder() @@ -113,12 +133,13 @@ final TopicProperty topicProperty = TopicProperty.builder() .maximumPoolSize(20) .topicArn("arn:aws:sns:us-east-2:000000000000:topic") .build(); - -final AmazonSnsTemplate snsTemplate = new AmazonSnsTemplate<>( - amazonSNS, topicProperty, new LinkedBlockingQueue<>(100)); + +AmazonSnsTemplate snsTemplate = AmazonSnsTemplate.builder(amazonSNS, topicProperty) + .objectMapper(new ObjectMapper()) + .build(); ``` -#### Using an `ObjectMapper` other than the default +#### Custom `BlockingQueue` and `ObjectMapper` ```java final TopicProperty topicProperty = TopicProperty.builder() @@ -128,12 +149,14 @@ final TopicProperty topicProperty = TopicProperty.builder() .maximumPoolSize(20) .topicArn("arn:aws:sns:us-east-2:000000000000:topic") .build(); - -final AmazonSnsTemplate snsTemplate = new AmazonSnsTemplate<>( - amazonSNS, topicProperty, new ObjectMapper<>()); + +AmazonSnsTemplate snsTemplate = AmazonSnsTemplate.builder(amazonSNS, topicProperty) + .topicRequests(new LinkedBlockingQueue<>(100)) + .objectMapper(new ObjectMapper()) + .build(); ``` -#### Using an `ObjectMapper` and a `BlockingQueue` other than the default +#### With Micrometer metrics ```java final TopicProperty topicProperty = TopicProperty.builder() @@ -143,9 +166,10 @@ final TopicProperty topicProperty = TopicProperty.builder() .maximumPoolSize(20) .topicArn("arn:aws:sns:us-east-2:000000000000:topic") .build(); - -final AmazonSnsTemplate snsTemplate = new AmazonSnsTemplate<>( - amazonSNS, topicProperty, new LinkedBlockingQueue<>(100), new ObjectMapper<>()); + +AmazonSnsTemplate snsTemplate = AmazonSnsTemplate.builder(amazonSNS, topicProperty) + .meterRegistry(new SimpleMeterRegistry()) + .build(); ``` ### Standard SNS @@ -159,7 +183,7 @@ final TopicProperty topicProperty = TopicProperty.builder() .topicArn("arn:aws:sns:us-east-2:000000000000:topic") .build(); -final AmazonSnsTemplate snsTemplate = new AmazonSnsTemplate<>(amazonSNS, topicProperty); +AmazonSnsTemplate snsTemplate = AmazonSnsTemplate.builder(amazonSNS, topicProperty).build(); final RequestEntry requestEntry = RequestEntry.builder() .withValue(new MyMessage()) @@ -180,7 +204,7 @@ final TopicProperty topicProperty = TopicProperty.builder() .topicArn("arn:aws:sns:us-east-2:000000000000:topic") .build(); -final AmazonSnsTemplate snsTemplate = new AmazonSnsTemplate<>(amazonSNS, topicProperty); +AmazonSnsTemplate snsTemplate = AmazonSnsTemplate.builder(amazonSNS, topicProperty).build(); final RequestEntry requestEntry = RequestEntry.builder() .withValue(new MyMessage()) @@ -203,7 +227,7 @@ final TopicProperty topicProperty = TopicProperty.builder() .topicArn("arn:aws:sns:us-east-2:000000000000:topic") .build(); -final AmazonSnsTemplate snsTemplate = new AmazonSnsTemplate<>(amazonSNS, topicProperty); +AmazonSnsTemplate snsTemplate = AmazonSnsTemplate.builder(amazonSNS, topicProperty).build(); final RequestEntry requestEntry = RequestEntry.builder() .withValue(new MyMessage()) @@ -212,14 +236,14 @@ final RequestEntry requestEntry = RequestEntry.builder() .withDeduplicationId(UUID.randomUUID().toString()) .build(); -snsTemplate.send(requestEntry).addCallback(result -> { - successCallback -> LOGGER.info("{}", successCallback), - failureCallback -> LOGGER.error("{}", failureCallback) -}); +snsTemplate.send(requestEntry).addCallback( + success -> LOGGER.info("Sent: {}", success.getMessageId()), + failure -> LOGGER.error("Failed: {} [{}]", failure.getMessage(), failure.getCode()) +); -snsTemplate.send(requestEntry).addCallback(result -> { - successCallback -> LOGGER.info("{}", successCallback) -}); +snsTemplate.send(requestEntry).addCallback( + success -> LOGGER.info("Sent: {}", success.getMessageId()) +); ``` ### Send And Wait @@ -233,7 +257,7 @@ final TopicProperty topicProperty = TopicProperty.builder() .topicArn("arn:aws:sns:us-east-2:000000000000:topic") .build(); -final AmazonSnsTemplate snsTemplate = new AmazonSnsTemplate<>(amazonSNS, topicProperty); +AmazonSnsTemplate snsTemplate = AmazonSnsTemplate.builder(amazonSNS, topicProperty).build(); final RequestEntry requestEntry = RequestEntry.builder() .withValue(new MyMessage()) @@ -242,10 +266,10 @@ final RequestEntry requestEntry = RequestEntry.builder() .withDeduplicationId(UUID.randomUUID().toString()) .build(); -snsTemplate.send(requestEntry).addCallback(result -> { - successCallback -> LOGGER.info("{}", successCallback), - failureCallback -> LOGGER.error("{}", failureCallback) -}); +snsTemplate.send(requestEntry).addCallback( + success -> LOGGER.info("Sent: {}", success.getMessageId()), + failure -> LOGGER.error("Failed: {} [{}]", failure.getMessage(), failure.getCode()) +); snsTemplate.await().join(); ``` @@ -261,7 +285,7 @@ final TopicProperty topicProperty = TopicProperty.builder() .topicArn("arn:aws:sns:us-east-2:000000000000:topic") .build(); -final AmazonSnsTemplate snsTemplate = new AmazonSnsTemplate<>(amazonSNS, topicProperty); +AmazonSnsTemplate snsTemplate = AmazonSnsTemplate.builder(amazonSNS, topicProperty).build(); final RequestEntry requestEntry = RequestEntry.builder() .withValue(new MyMessage()) @@ -270,14 +294,86 @@ final RequestEntry requestEntry = RequestEntry.builder() .withDeduplicationId(UUID.randomUUID().toString()) .build(); -snsTemplate.send(requestEntry).addCallback(result -> { - successCallback -> LOGGER.info("{}", successCallback), - failureCallback -> LOGGER.error("{}", failureCallback) -}); +snsTemplate.send(requestEntry).addCallback( + success -> LOGGER.info("Sent: {}", success.getMessageId()), + failure -> LOGGER.error("Failed: {} [{}]", failure.getMessage(), failure.getCode()) +); snsTemplate.shutdown(); ``` +### Full Example with Builder + +```java +TopicProperty topicProperty = TopicProperty.builder() + .fifo(false) + .linger(100L) + .maxBatchSize(10) + .maximumPoolSize(5) + .topicArn("arn:aws:sns:us-east-2:000000000000:topic") + .build(); + +AmazonSnsTemplate template = AmazonSnsTemplate.builder(snsClient, topicProperty) + .meterRegistry(new SimpleMeterRegistry()) + .topicRequests(new RingBufferBlockingQueue<>(1024)) + .objectMapper(new ObjectMapper()) + .build(); + +template.send(RequestEntry.builder() + .withValue(new MyMessage("hello")) + .withMessageHeaders(Map.of("source", "app-1")) + .withGroupId(UUID.randomUUID().toString()) + .build()); + +template.await().thenRun(template::shutdown).join(); +``` + +--- + +## Metrics + +When a `MeterRegistry` is provided via the builder, the library records these Micrometer metrics: + +### SNS Publish + +Tags: `topic` = `` + +| Metric | Type | Description | +|--------------------------|---------------------|------------------------------------------------------------| +| `sns.publish.attempts` | Counter | Total PublishBatch attempts | +| `sns.publish.success` | Counter | Successful messages | +| `sns.publish.failure` | Counter | Failed messages (dynamic tags: `error_code`, `error_type`) | +| `sns.publish.duration` | Timer | Publish latency (p50/p95/p99) | +| `sns.publish.batch.size` | DistributionSummary | Messages per batch | +| `sns.publish.inflight` | Gauge | In-flight publish batches | + +### Blocking Queue + +Tags: `name` = `` + +| Metric | Type | Description | +|--------------------------------|---------|-----------------------------------------| +| `blocking.queue.puts.total` | Counter | Successful put operations | +| `blocking.queue.puts.failed` | Counter | Put operations that threw an exception | +| `blocking.queue.put.duration` | Timer | Put latency (percentile histogram) | +| `blocking.queue.takes.total` | Counter | Successful take operations | +| `blocking.queue.takes.failed` | Counter | Take operations that threw an exception | +| `blocking.queue.take.duration` | Timer | Take latency (percentile histogram) | +| `blocking.queue.size` | Gauge | Current queue depth | + +### Executor + +Tags: `name` = `` + +| Metric | Type | Description | +|----------------------------|---------|-----------------------------------| +| `executor.active` | Gauge | Tasks currently executing | +| `executor.tasks.succeeded` | Counter | Tasks completed without exception | +| `executor.tasks.failed` | Counter | Tasks completed with exception | +| `executor.task.duration` | Timer | Task wall-clock duration | + +--- + ## Contributing Please read [CONTRIBUTING.md](CONTRIBUTING.md) for details on our code of conduct, and the process for submitting pull requests to us. @@ -288,7 +384,7 @@ We use [GitHub](https://github.com/mvallim/amazon-sns-java-messaging-lib) for ve ## Authors -* **Marcos Vallim** - *Founder, Author, Development, Test, Documentation* - [mvallim](https://github.com/mvallim) +* **Marcos Vallim** - _Founder, Author, Development, Test, Documentation_ - [mvallim](https://github.com/mvallim) See also the list of [contributors](CONTRIBUTORS.txt) who participated in this project. diff --git a/amazon-sns-java-messaging-lib-template/README.md b/amazon-sns-java-messaging-lib-template/README.md new file mode 100644 index 0000000..d8c5922 --- /dev/null +++ b/amazon-sns-java-messaging-lib-template/README.md @@ -0,0 +1,99 @@ +# amazon-sns-java-messaging-lib-template + +Shared abstract module providing the core framework for batched Amazon SNS publishing. All SDK-specific implementations (`-v1`, `-v2`) depend on this module. + +## Package Structure + +```text +com.amazon.sns.messaging.lib + ├── concurrent/ -- Thread pool, blocking queue, and thread factory utilities + ├── core/ -- Abstract base classes and interfaces for producer/consumer patterns + ├── exception/ -- Custom exceptions (e.g., MaximumAllowedMessageException) + ├── metrics/ -- Micrometer-based metrics decorators (abstract) + └── model/ -- Request/response data models and builders +``` + +## Key Components + +### `core/` + +| Class | Description | +|--------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `AbstractAmazonSnsTemplate` | High-level template. Composes a producer + consumer. Fluent builder with defaults: `ConcurrentHashMap` for pending requests, `RingBufferBlockingQueue`, identity decorator, `SimpleMeterRegistry`. | +| `AbstractAmazonSnsProducer` | Enqueues `RequestEntry` into a `BlockingQueue`. Tracks lifecycle state (`RUNNIG`/`SHUTDOWN`). | +| `AbstractAmazonSnsConsumer` | Periodic batch drain loop. Polls queue, assembles batches respecting 256 KB limit, publishes via SDK client, dispatches success/failure callbacks. | +| `AmazonSnsConsumer` | Public interface: `publish()`, `handleError()`, `handleResponse()`, `shutdown()`, `await()`. | +| `AmazonSnsProducer` | Public interface: `send()`, `shutdown()`. | +| `ListenableFuture` / `ListenableFutureImpl` | Simple future with success/failure callbacks. Tracks `NEW`/`SUCCESS`/`FAILURE` state. | +| `AbstractMessageAttributes` | Converts message header entries into typed attribute values (`String`, `Number`, `Binary`, `String.Array`, `Enum`). | +| `RequestEntryInternalFactory` | Serializes payloads, computes attribute sizes, creates `RequestEntryInternal` objects for batch processing. | + +### `concurrent/` + +| Class | Description | +|-------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `AmazonSnsThreadPoolExecutor` | `ThreadPoolExecutor` with zero core threads, `SynchronousQueue`, 30-second blocking rejection policy. | +| `RingBufferBlockingQueue` | Lock-based ring buffer (`AtomicReferenceArray`) with producer/consumer conditions. Only `put()` / `take()` supported; all else throws `UnsupportedOperationException`. Default capacity: 2048. | +| `BlockingSubmissionPolicy` | Blocks caller up to a configurable timeout when thread pool is saturated. | +| `ThreadFactoryProvider` | Selects virtual thread factories (Java 21+, reflection-based) or default daemon thread factories at runtime. | + +### `metrics/` + +| Class | Description | +|--------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------| +| `AbstractAmazonSnsConsumerMetricsDecorator` | Abstract Micrometer decorator. Tracks `sns.publish.attempts`, `sns.publish.success`, `sns.publish.failure`, duration, batch size, and inflight gauge. | +| `BlockingQueueMetricsDecorator` | Wraps a `BlockingQueue` with put/take counters, latency histograms, and queue size gauge. | +| `ExecutorServiceMetricsDecorator` | Wraps an `ExecutorService` with active task count, success/failure counters, and task duration timer. | + +### `model/` + +| Class | Description | +|------------------------------|----------------------------------------------------------------------------------------------------------------------------------| +| `RequestEntry` | Single message: payload, headers, subject, FIFO group/deduplication IDs. Auto-generates `createTime` (nanoTime) and `id` (UUID). | +| `TopicProperty` | Topic config: `fifo`, `maximumPoolSize`, `topicArn`, `linger`, `maxBatchSize`. | +| `ResponseSuccessEntry` | Successful result: `id`, `messageId`, `sequenceNumber`. | +| `ResponseFailEntry` | Failure result: `id`, `message`, `code`, `senderFault`. | +| `PublishRequestBuilder` | Generic builder for SDK-specific batch request objects using a `BiFunction` supplier. | + +### `exception/` + +| Class | Description | +|----------------------------------|---------------------------------------------------------| +| `MaximumAllowedMessageException` | Thrown when a single serialized message exceeds 256 KB. | + +## Dependencies + +Managed by parent POM `com.github.mvallim:amazon-sns-java-messaging-lib:1.3.0`: + +```text +org.slf4j:slf4j-api:2.0.6 +org.apache.commons:commons-lang3:3.20.0 +org.apache.commons:commons-collections4:4.5.0 +com.fasterxml.jackson.core:jackson-databind:2.16.1 +io.micrometer:micrometer-core:1.16.3 +org.projectlombok:lombok:1.18.42 (provided) +``` + +## Metrics + +All metrics from this module are inherited by v1 and v2 implementations. See the [Technical Guide](../GUIDE.md#metrics-micrometer) for the complete reference. + +## Usage + +This module is not used directly. Import either `-v1` or `-v2`: + +```xml + + com.github.mvallim + amazon-sns-java-messaging-lib-v1 + 1.3.0 + +``` + +```xml + + com.github.mvallim + amazon-sns-java-messaging-lib-v2 + 1.3.0 + +``` diff --git a/amazon-sns-java-messaging-lib-template/pom.xml b/amazon-sns-java-messaging-lib-template/pom.xml index ef29cca..92afba7 100644 --- a/amazon-sns-java-messaging-lib-template/pom.xml +++ b/amazon-sns-java-messaging-lib-template/pom.xml @@ -7,7 +7,7 @@ com.github.mvallim amazon-sns-java-messaging-lib - 1.2.0-SNAPSHOT + 1.3.0-SNAPSHOT ../pom.xml diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/AmazonSnsThreadPoolExecutor.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/AmazonSnsThreadPoolExecutor.java index 8f1a220..04bf8a7 100644 --- a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/AmazonSnsThreadPoolExecutor.java +++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/AmazonSnsThreadPoolExecutor.java @@ -1,5 +1,5 @@ /* - * Copyright 2023 the original author or authors. + * Copyright 2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/RingBufferBlockingQueue.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/RingBufferBlockingQueue.java index fe1f2de..a3a6370 100644 --- a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/RingBufferBlockingQueue.java +++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/RingBufferBlockingQueue.java @@ -1,5 +1,5 @@ /* - * Copyright 2024 the original author or authors. + * Copyright 2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -34,9 +34,9 @@ import lombok.SneakyThrows; /** - * A bounded blocking queue backed by a ring buffer (circular array). Supports blocking - * {@link #put(Object)} and {@link #take()} operations. Other {@link BlockingQueue} methods - * throw {@link UnsupportedOperationException}. + * A bounded blocking queue backed by a ring buffer (circular array). Supports + * blocking {@link #put(Object)} and {@link #take()} operations. Other + * {@link BlockingQueue} methods throw {@link UnsupportedOperationException}. * * @param the type of elements held in this queue */ @@ -51,7 +51,10 @@ public class RingBufferBlockingQueue extends AbstractQueue implements Bloc /** The fixed maximum number of elements the queue can hold. */ private final int capacity; - /** Sequence number tracking the next write position (starts at -1 indicating no writes). */ + /** + * Sequence number tracking the next write position (starts at -1 indicating no + * writes). + */ private final AtomicLong writeSequence = new AtomicLong(-1); /** Sequence number tracking the next read position. */ @@ -91,7 +94,8 @@ public RingBufferBlockingQueue() { } /** - * Prevents sequence overflow by wrapping around when the maximum long value is reached. + * Prevents sequence overflow by wrapping around when the maximum long value is + * reached. * * @param sequence the current sequence value * @return the sequence value, wrapped if necessary diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/ThreadFactoryProvider.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/ThreadFactoryProvider.java index 32ceec3..df6a7e2 100644 --- a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/ThreadFactoryProvider.java +++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/ThreadFactoryProvider.java @@ -1,5 +1,5 @@ /* - * Copyright 2023 the original author or authors. + * Copyright 2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,18 +29,22 @@ import lombok.SneakyThrows; /** - * Provides {@link ThreadFactory} instances, selecting between virtual thread factories - * (Java 21+) and default thread factories based on the runtime Java version. + * Provides {@link ThreadFactory} instances, selecting between virtual thread + * factories (Java 21+) and default thread factories based on the runtime Java + * version. */ @NoArgsConstructor(access = AccessLevel.PRIVATE) public final class ThreadFactoryProvider { - + /** Class logger. */ private static final Logger LOGGER = LoggerFactory.getLogger(ThreadFactoryProvider.class); - /** Cached supplier of the appropriate thread factory for the runtime Java version. */ + /** + * Cached supplier of the appropriate thread factory for the runtime Java + * version. + */ private static Supplier supplierThreadFactory; - + static { if (ThreadFactoryProvider.getJavaVersion() >= 21) { ThreadFactoryProvider.supplierThreadFactory = ThreadFactoryProvider::getVirtualThreadFactory; @@ -50,7 +54,7 @@ public final class ThreadFactoryProvider { ThreadFactoryProvider.LOGGER.info("Java version is {}, using default thread factory", ThreadFactoryProvider.getJavaVersion()); } } - + /** * Returns a {@link ThreadFactory} appropriate for the current Java version. * @@ -59,7 +63,7 @@ public final class ThreadFactoryProvider { public static ThreadFactory getThreadFactory() { return ThreadFactoryProvider.supplierThreadFactory.get(); } - + /** * Creates a default thread factory for Java versions below 21. * @@ -69,7 +73,7 @@ public static ThreadFactory getThreadFactory() { private static ThreadFactory getDefaultThreadFactory() { return Executors.defaultThreadFactory(); } - + /** * Creates a virtual thread factory using reflection (Java 21+). * @@ -92,16 +96,16 @@ private static ThreadFactory getVirtualThreadFactory() { */ private static int getJavaVersion() { String version = System.getProperty("java.version"); - + if (version.startsWith("1.")) { version = version.substring(2); } - + final int dotPos = version.indexOf('.'); final int dashPos = version.indexOf('-'); final int endIndex = dotPos > -1 ? dotPos : dashPos > -1 ? dashPos : 1; - + return Integer.parseInt(version.substring(0, endIndex)); } - + } \ No newline at end of file diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsConsumer.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsConsumer.java index 924f3f7..ed4abb7 100644 --- a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsConsumer.java +++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsConsumer.java @@ -1,5 +1,5 @@ /* - * Copyright 2023 the original author or authors. + * Copyright 2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsProducer.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsProducer.java index af6685a..34282e9 100644 --- a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsProducer.java +++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsProducer.java @@ -1,5 +1,5 @@ /* - * Copyright 2023 the original author or authors. + * Copyright 2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsTemplate.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsTemplate.java index c159664..05c7f05 100644 --- a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsTemplate.java +++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsTemplate.java @@ -1,5 +1,5 @@ /* - * Copyright 2023 the original author or authors. + * Copyright 2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractMessageAttributes.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractMessageAttributes.java index 90368ab..07ece3c 100644 --- a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractMessageAttributes.java +++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractMessageAttributes.java @@ -1,5 +1,5 @@ /* - * Copyright 2023 the original author or authors. + * Copyright 2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsConsumer.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsConsumer.java index 14776d5..1741aa7 100644 --- a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsConsumer.java +++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsConsumer.java @@ -1,10 +1,27 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.amazon.sns.messaging.lib.core; import java.util.concurrent.CompletableFuture; /** - * Consumer interface for Amazon SNS messaging. Implementations handle batch publishing - * of requests and dispatching of responses or errors to pending request futures. + * Consumer interface for Amazon SNS messaging. Implementations handle batch + * publishing of requests and dispatching of responses or errors to pending + * request futures. * * @param the publish batch request type * @param the publish batch result type diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducer.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducer.java index 91f9b14..526584e 100644 --- a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducer.java +++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducer.java @@ -1,5 +1,5 @@ /* - * Copyright 2023 the original author or authors. + * Copyright 2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,8 +21,9 @@ import com.amazon.sns.messaging.lib.model.ResponseSuccessEntry; /** - * Producer interface for Amazon SNS messaging. Implementations enqueue request entries - * for batch publishing and track pending requests for asynchronous completion. + * Producer interface for Amazon SNS messaging. Implementations enqueue request + * entries for batch publishing and track pending requests for asynchronous + * completion. * * @param the request entry payload type */ @@ -31,8 +32,10 @@ public interface AmazonSnsProducer { /** * Sends a request entry for asynchronous publishing to an SNS topic. * - * @param requestEntry the request entry containing the message payload and metadata - * @return a {@link ListenableFuture} that completes when the request is processed + * @param requestEntry the request entry containing the message payload and + * metadata + * @return a {@link ListenableFuture} that completes when the request is + * processed */ public ListenableFuture send(final RequestEntry requestEntry); diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/ListenableFuture.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/ListenableFuture.java index ac16c1f..9c0068e 100644 --- a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/ListenableFuture.java +++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/ListenableFuture.java @@ -1,5 +1,5 @@ /* - * Copyright 2023 the original author or authors. + * Copyright 2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/ListenableFutureImpl.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/ListenableFutureImpl.java index cfe3e54..7d8f085 100644 --- a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/ListenableFutureImpl.java +++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/ListenableFutureImpl.java @@ -1,5 +1,5 @@ /* - * Copyright 2023 the original author or authors. + * Copyright 2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/RequestEntryInternalFactory.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/RequestEntryInternalFactory.java index 95f1d59..a6a080a 100644 --- a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/RequestEntryInternalFactory.java +++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/RequestEntryInternalFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2023 the original author or authors. + * Copyright 2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/exception/MaximumAllowedMessageException.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/exception/MaximumAllowedMessageException.java index 1782046..a2259dd 100644 --- a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/exception/MaximumAllowedMessageException.java +++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/exception/MaximumAllowedMessageException.java @@ -1,3 +1,19 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.amazon.sns.messaging.lib.exception; import com.amazon.sns.messaging.lib.model.RequestEntry; @@ -5,8 +21,8 @@ import lombok.Getter; /** - * Thrown when a message exceeds the maximum allowed size of 256 KB for Amazon SNS. - * Contains the offending {@link RequestEntry} for diagnostic purposes. + * Thrown when a message exceeds the maximum allowed size of 256 KB for Amazon + * SNS. Contains the offending {@link RequestEntry} for diagnostic purposes. */ @Getter @SuppressWarnings({ "rawtypes", "unchecked" }) @@ -17,7 +33,8 @@ public class MaximumAllowedMessageException extends RuntimeException { private final RequestEntry request; /** - * Constructs a new exception with the given detail message and the offending request. + * Constructs a new exception with the given detail message and the offending + * request. * * @param string the detail message * @param request the request entry that exceeded the size limit diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/metrics/AbstractAmazonSnsConsumerMetricsDecorator.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/metrics/AbstractAmazonSnsConsumerMetricsDecorator.java index e31034d..836e689 100644 --- a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/metrics/AbstractAmazonSnsConsumerMetricsDecorator.java +++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/metrics/AbstractAmazonSnsConsumerMetricsDecorator.java @@ -1,3 +1,19 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.amazon.sns.messaging.lib.metrics; import java.util.Optional; diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/metrics/BlockingQueueMetricsDecorator.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/metrics/BlockingQueueMetricsDecorator.java index a948a69..4a8c9f0 100644 --- a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/metrics/BlockingQueueMetricsDecorator.java +++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/metrics/BlockingQueueMetricsDecorator.java @@ -1,3 +1,19 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.amazon.sns.messaging.lib.metrics; import java.util.Collection; diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/metrics/ExecutorServiceMetricsDecorator.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/metrics/ExecutorServiceMetricsDecorator.java index 0decc2c..fa748de 100644 --- a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/metrics/ExecutorServiceMetricsDecorator.java +++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/metrics/ExecutorServiceMetricsDecorator.java @@ -1,3 +1,19 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.amazon.sns.messaging.lib.metrics; import java.util.Collection; diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/model/PublishRequestBuilder.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/model/PublishRequestBuilder.java index 74516e3..d12dd61 100644 --- a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/model/PublishRequestBuilder.java +++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/model/PublishRequestBuilder.java @@ -54,7 +54,10 @@ public static Builder builder() { @RequiredArgsConstructor(access = AccessLevel.PRIVATE) public static class Builder { - /** The supplier function that creates a publish request from a topic ARN and entries. */ + /** + * The supplier function that creates a publish request from a topic ARN and + * entries. + */ private BiFunction, R> supplier; /** The SNS topic ARN to publish to. */ diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/model/RequestEntry.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/model/RequestEntry.java index 1a114b3..5d15adb 100644 --- a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/model/RequestEntry.java +++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/model/RequestEntry.java @@ -28,7 +28,8 @@ /** * Represents a single message request to be published to an Amazon SNS topic. - * Contains the message payload, metadata, and optional FIFO-related identifiers. + * Contains the message payload, metadata, and optional FIFO-related + * identifiers. * * @param the type of the message payload */ diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/model/ResponseFailEntry.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/model/ResponseFailEntry.java index 915bf39..053f445 100644 --- a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/model/ResponseFailEntry.java +++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/model/ResponseFailEntry.java @@ -23,7 +23,8 @@ import lombok.ToString; /** - * Represents a failed publish result from Amazon SNS, containing the error details. + * Represents a failed publish result from Amazon SNS, containing the error + * details. */ @Getter @ToString diff --git a/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/concurrent/AmazonSnsThreadPoolExecutorTest.java b/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/concurrent/AmazonSnsThreadPoolExecutorTest.java index 3eb0ab3..c9910e6 100644 --- a/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/concurrent/AmazonSnsThreadPoolExecutorTest.java +++ b/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/concurrent/AmazonSnsThreadPoolExecutorTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2024 the original author or authors. + * Copyright 2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/concurrent/BlockingSubmissionPolicyTest.java b/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/concurrent/BlockingSubmissionPolicyTest.java index bdfe763..1b906c9 100644 --- a/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/concurrent/BlockingSubmissionPolicyTest.java +++ b/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/concurrent/BlockingSubmissionPolicyTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2024 the original author or authors. + * Copyright 2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/concurrent/RingBufferBlockingQueueTest.java b/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/concurrent/RingBufferBlockingQueueTest.java index 0a0764c..5ecd3e3 100644 --- a/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/concurrent/RingBufferBlockingQueueTest.java +++ b/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/concurrent/RingBufferBlockingQueueTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2024 the original author or authors. + * Copyright 2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsConsumerTest.java b/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsConsumerTest.java index 0944284..ea7a0f0 100644 --- a/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsConsumerTest.java +++ b/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsConsumerTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2023 the original author or authors. + * Copyright 2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsProducerTest.java b/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsProducerTest.java index 8d1c7ee..eba4111 100644 --- a/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsProducerTest.java +++ b/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsProducerTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2023 the original author or authors. + * Copyright 2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsTemplateTest.java b/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsTemplateTest.java index 4f39288..6bee662 100644 --- a/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsTemplateTest.java +++ b/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsTemplateTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2023 the original author or authors. + * Copyright 2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/core/AbstractMessageAttributesTest.java b/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/core/AbstractMessageAttributesTest.java index 077814f..fac6a8f 100644 --- a/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/core/AbstractMessageAttributesTest.java +++ b/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/core/AbstractMessageAttributesTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2023 the original author or authors. + * Copyright 2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/core/ListenableFutureImplTest.java b/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/core/ListenableFutureImplTest.java index e3c12de..926cf8c 100644 --- a/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/core/ListenableFutureImplTest.java +++ b/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/core/ListenableFutureImplTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2024 the original author or authors. + * Copyright 2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/core/ListenableFutureTest.java b/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/core/ListenableFutureTest.java index aca744c..1b7cfa6 100644 --- a/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/core/ListenableFutureTest.java +++ b/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/core/ListenableFutureTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2024 the original author or authors. + * Copyright 2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/helpers/TryConsumer.java b/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/helpers/TryConsumer.java index 832f2eb..b6ba8a3 100644 --- a/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/helpers/TryConsumer.java +++ b/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/helpers/TryConsumer.java @@ -1,5 +1,5 @@ /* - * Copyright 2023 the original author or authors. + * Copyright 2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/metrics/BlockingQueueMetricsDecoratorTest.java b/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/metrics/BlockingQueueMetricsDecoratorTest.java index 5a2bd88..307e809 100644 --- a/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/metrics/BlockingQueueMetricsDecoratorTest.java +++ b/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/metrics/BlockingQueueMetricsDecoratorTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2023 the original author or authors. + * Copyright 2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/metrics/ExecutorServiceMetricsDecoratorTest.java b/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/metrics/ExecutorServiceMetricsDecoratorTest.java index 05e9135..cf56226 100644 --- a/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/metrics/ExecutorServiceMetricsDecoratorTest.java +++ b/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/metrics/ExecutorServiceMetricsDecoratorTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2023 the original author or authors. + * Copyright 2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/amazon-sns-java-messaging-lib-v1/README.md b/amazon-sns-java-messaging-lib-v1/README.md new file mode 100644 index 0000000..923ef40 --- /dev/null +++ b/amazon-sns-java-messaging-lib-v1/README.md @@ -0,0 +1,121 @@ +# amazon-sns-java-messaging-lib-v1 + +AWS SDK v1 implementation of the Amazon SNS Java Messaging Library. Provides batched message publishing to SNS using `com.amazonaws:aws-java-sdk-sns:1.12.661`. + +## Package Structure + +```text +com.amazon.sns.messaging.lib + ├── core/ + │ ├── AmazonSnsTemplate.java -- Public API entry point + │ ├── AmazonSnsProducerImpl.java -- Producer (enqueues requests) + │ ├── AmazonSnsConsumerImpl.java -- Consumer (calls AmazonSNS.publishBatch) + │ └── MessageAttributes.java -- Header-to-MessageAttributeValue converter + └── metrics/ + └── AmazonSnsConsumerMetricsDecorator.java -- Micrometer metrics decorator +``` + +## Key Classes + +| Class | Description | +|-------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `AmazonSnsTemplate` | Extends `AbstractAmazonSnsTemplate`. Primary API: `send()`, `shutdown()`, `await()`. Use the builder: `AmazonSnsTemplate.builder(amazonSNS, topicProperty)`. Deprecated constructors available for backward compatibility. | +| `AmazonSnsProducerImpl` | Extends `AbstractAmazonSnsProducer`. Thin wrapper that enqueues `RequestEntry` into a shared blocking queue. | +| `AmazonSnsConsumerImpl` | Extends `AbstractAmazonSnsConsumer`. Calls `AmazonSNS.publishBatch()` with v1 `PublishBatchRequest`/`PublishBatchResult`. Handles per-entry success/failure from batch response. | +| `MessageAttributes` | Extends `AbstractMessageAttributes`. Converts header entries to v1 `MessageAttributeValue` objects (String, Number, Binary, String.Array, Enum). | +| `AmazonSnsConsumerMetricsDecorator` | Extends `AbstractAmazonSnsConsumerMetricsDecorator`. Records publish attempts, latency, batch size, inflight count. Tags failures by `AmazonServiceException` error code. | + +## Dependencies + +```text +com.github.mvallim:amazon-sns-java-messaging-lib-template:1.3.0 +com.amazonaws:aws-java-sdk-sns:1.12.661 +``` + +## Usage + +### Standard SNS Topic + +```java +AmazonSNS amazonSNS = AmazonSNSClientBuilder.defaultClient(); + +TopicProperty topicProperty = TopicProperty.builder() + .fifo(false) + .linger(100) + .maxBatchSize(10) + .maximumPoolSize(20) + .topicArn("arn:aws:sns:us-east-2:000000000000:topic") + .build(); + +AmazonSnsTemplate snsTemplate = AmazonSnsTemplate.builder(amazonSNS, topicProperty).build(); + +RequestEntry entry = RequestEntry.builder() + .withValue(new MyMessage()) + .withMessageHeaders(Map.of("header1", "value1")) + .build(); + +snsTemplate.send(entry); +snsTemplate.shutdown(); +``` + +### FIFO SNS Topic + +```java +TopicProperty topicProperty = TopicProperty.builder() + .fifo(true) + .linger(100) + .maxBatchSize(10) + .maximumPoolSize(20) + .topicArn("arn:aws:sns:us-east-2:000000000000:topic.fifo") + .build(); + +AmazonSnsTemplate snsTemplate = AmazonSnsTemplate.builder(amazonSNS, topicProperty).build(); + +RequestEntry entry = RequestEntry.builder() + .withValue(new MyMessage()) + .withGroupId(UUID.randomUUID().toString()) + .withDeduplicationId(UUID.randomUUID().toString()) + .build(); + +snsTemplate.send(entry).addCallback( + success -> LOGGER.info("Sent: {}", success), + failure -> LOGGER.error("Failed: {}", failure) +); +snsTemplate.await().join(); +``` + +### With Custom ObjectMapper and Queue + +```java +AmazonSnsTemplate snsTemplate = AmazonSnsTemplate.builder(amazonSNS, topicProperty) + .objectMapper(new ObjectMapper()) + .topicRequests(new LinkedBlockingQueue<>(100)) + .publishDecorator(req -> req) + .build(); +``` + +## Metrics + +When a `MeterRegistry` is provided via `.meterRegistry()`, the following metrics are recorded: + +| Metric | Type | Tags | Description | +|--------------------------------|---------------------|-------------------------------------|-----------------------------| +| `sns.publish.attempts` | Counter | `topic` | Total PublishBatch attempts | +| `sns.publish.success` | Counter | `topic` | Successful messages | +| `sns.publish.failure` | Counter | `topic`, `error_code`, `error_type` | Failed messages | +| `sns.publish.duration` | Timer (p50/p95/p99) | `topic` | Publish latency | +| `sns.publish.batch.size` | DistributionSummary | `topic` | Messages per batch | +| `sns.publish.inflight` | Gauge | `topic` | In-flight publish batches | +| `blocking.queue.puts.total` | Counter | `name` | Successful put operations | +| `blocking.queue.puts.failed` | Counter | `name` | Failed put operations | +| `blocking.queue.put.duration` | Timer | `name` | Put latency | +| `blocking.queue.takes.total` | Counter | `name` | Successful take operations | +| `blocking.queue.takes.failed` | Counter | `name` | Failed take operations | +| `blocking.queue.take.duration` | Timer | `name` | Take latency | +| `blocking.queue.size` | Gauge | `name` | Queue depth | +| `executor.active` | Gauge | `name` | Active tasks | +| `executor.tasks.succeeded` | Counter | `name` | Successful tasks | +| `executor.tasks.failed` | Counter | `name` | Failed tasks | +| `executor.task.duration` | Timer | `name` | Task duration | + +See the [Technical Guide](../GUIDE.md#metrics-micrometer) for details. diff --git a/amazon-sns-java-messaging-lib-v1/pom.xml b/amazon-sns-java-messaging-lib-v1/pom.xml index d0712e9..83977d4 100644 --- a/amazon-sns-java-messaging-lib-v1/pom.xml +++ b/amazon-sns-java-messaging-lib-v1/pom.xml @@ -7,7 +7,7 @@ com.github.mvallim amazon-sns-java-messaging-lib - 1.2.0-SNAPSHOT + 1.3.0-SNAPSHOT ../pom.xml diff --git a/amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsConsumerImpl.java b/amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsConsumerImpl.java index 4f2c24e..df58d45 100644 --- a/amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsConsumerImpl.java +++ b/amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsConsumerImpl.java @@ -1,5 +1,5 @@ /* - * Copyright 2023 the original author or authors. + * Copyright 2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducerImpl.java b/amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducerImpl.java index 9c25f96..16143fe 100644 --- a/amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducerImpl.java +++ b/amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducerImpl.java @@ -1,5 +1,5 @@ /* - * Copyright 2023 the original author or authors. + * Copyright 2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsTemplate.java b/amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsTemplate.java index d4c9e0f..481b357 100644 --- a/amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsTemplate.java +++ b/amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsTemplate.java @@ -1,5 +1,5 @@ /* - * Copyright 2023 the original author or authors. + * Copyright 2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/core/MessageAttributes.java b/amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/core/MessageAttributes.java index 450661f..462b194 100644 --- a/amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/core/MessageAttributes.java +++ b/amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/core/MessageAttributes.java @@ -1,5 +1,5 @@ /* - * Copyright 2023 the original author or authors. + * Copyright 2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/metrics/AmazonSnsConsumerMetricsDecorator.java b/amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/metrics/AmazonSnsConsumerMetricsDecorator.java index ab75eed..413758e 100644 --- a/amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/metrics/AmazonSnsConsumerMetricsDecorator.java +++ b/amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/metrics/AmazonSnsConsumerMetricsDecorator.java @@ -1,3 +1,19 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.amazon.sns.messaging.lib.metrics; import org.slf4j.Logger; @@ -10,6 +26,7 @@ import com.amazonaws.services.sns.model.PublishBatchResult; import io.micrometer.core.instrument.MeterRegistry; +import lombok.SneakyThrows; // @formatter:off /** @@ -39,6 +56,7 @@ public AmazonSnsConsumerMetricsDecorator( * {@inheritDoc} */ @Override + @SneakyThrows public PublishBatchResult publish(final PublishBatchRequest publishBatchRequest) { publishAttemptsCounter.increment(); batchSizeSummary.record(publishBatchRequest.getPublishBatchRequestEntries().size()); @@ -46,10 +64,6 @@ public PublishBatchResult publish(final PublishBatchRequest publishBatchRequest) try { return publishTimer.recordCallable(() -> delegate.publish(publishBatchRequest)); - } catch (final RuntimeException ex) { - throw ex; - } catch (final Exception ex) { - throw new RuntimeException(ex); } finally { inflightGauge.decrementAndGet(); } diff --git a/amazon-sns-java-messaging-lib-v1/src/test/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducerAsyncTest.java b/amazon-sns-java-messaging-lib-v1/src/test/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducerAsyncTest.java index dcb64eb..3d72b37 100644 --- a/amazon-sns-java-messaging-lib-v1/src/test/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducerAsyncTest.java +++ b/amazon-sns-java-messaging-lib-v1/src/test/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducerAsyncTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2023 the original author or authors. + * Copyright 2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/amazon-sns-java-messaging-lib-v1/src/test/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducerSyncTest.java b/amazon-sns-java-messaging-lib-v1/src/test/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducerSyncTest.java index 82284a1..f7eb544 100644 --- a/amazon-sns-java-messaging-lib-v1/src/test/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducerSyncTest.java +++ b/amazon-sns-java-messaging-lib-v1/src/test/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducerSyncTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2023 the original author or authors. + * Copyright 2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/amazon-sns-java-messaging-lib-v1/src/test/java/com/amazon/sns/messaging/lib/core/AmazonSnsTemplateIntegrationTest.java b/amazon-sns-java-messaging-lib-v1/src/test/java/com/amazon/sns/messaging/lib/core/AmazonSnsTemplateIntegrationTest.java index 87b2541..b13f42f 100644 --- a/amazon-sns-java-messaging-lib-v1/src/test/java/com/amazon/sns/messaging/lib/core/AmazonSnsTemplateIntegrationTest.java +++ b/amazon-sns-java-messaging-lib-v1/src/test/java/com/amazon/sns/messaging/lib/core/AmazonSnsTemplateIntegrationTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2023 the original author or authors. + * Copyright 2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/amazon-sns-java-messaging-lib-v1/src/test/java/com/amazon/sns/messaging/lib/core/MessageAttributesTest.java b/amazon-sns-java-messaging-lib-v1/src/test/java/com/amazon/sns/messaging/lib/core/MessageAttributesTest.java index 7dfc54c..54a8934 100644 --- a/amazon-sns-java-messaging-lib-v1/src/test/java/com/amazon/sns/messaging/lib/core/MessageAttributesTest.java +++ b/amazon-sns-java-messaging-lib-v1/src/test/java/com/amazon/sns/messaging/lib/core/MessageAttributesTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2023 the original author or authors. + * Copyright 2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/amazon-sns-java-messaging-lib-v1/src/test/java/com/amazon/sns/messaging/lib/core/helper/ConsumerHelper.java b/amazon-sns-java-messaging-lib-v1/src/test/java/com/amazon/sns/messaging/lib/core/helper/ConsumerHelper.java index cbd68ea..e36d340 100644 --- a/amazon-sns-java-messaging-lib-v1/src/test/java/com/amazon/sns/messaging/lib/core/helper/ConsumerHelper.java +++ b/amazon-sns-java-messaging-lib-v1/src/test/java/com/amazon/sns/messaging/lib/core/helper/ConsumerHelper.java @@ -1,5 +1,5 @@ /* - * Copyright 2023 the original author or authors. + * Copyright 2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/amazon-sns-java-messaging-lib-v1/src/test/java/com/amazon/sns/messaging/lib/metrics/AmazonSnsConsumerMetricsDecoratorTest.java b/amazon-sns-java-messaging-lib-v1/src/test/java/com/amazon/sns/messaging/lib/metrics/AmazonSnsConsumerMetricsDecoratorTest.java index e03b07c..885f98d 100644 --- a/amazon-sns-java-messaging-lib-v1/src/test/java/com/amazon/sns/messaging/lib/metrics/AmazonSnsConsumerMetricsDecoratorTest.java +++ b/amazon-sns-java-messaging-lib-v1/src/test/java/com/amazon/sns/messaging/lib/metrics/AmazonSnsConsumerMetricsDecoratorTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2023 the original author or authors. + * Copyright 2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,22 +16,25 @@ package com.amazon.sns.messaging.lib.metrics; -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.mockito.ArgumentMatchers.any; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import java.util.Arrays; -import java.util.Collections; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.stream.IntStream; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; +import org.mockito.Spy; import org.mockito.junit.jupiter.MockitoExtension; import com.amazon.sns.messaging.lib.core.AmazonSnsConsumer; @@ -44,14 +47,19 @@ import com.amazonaws.services.sns.model.PublishBatchResultEntry; import io.micrometer.core.instrument.Counter; -import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.DistributionSummary; +import io.micrometer.core.instrument.Gauge; +import io.micrometer.core.instrument.Timer; import io.micrometer.core.instrument.simple.SimpleMeterRegistry; // @formatter:off @ExtendWith(MockitoExtension.class) class AmazonSnsConsumerMetricsDecoratorTest { - private static final String TOPIC_ARN = "arn:aws:sns:us-east-1:000000000000:my-topic"; + private static final String TOPIC_ARN = "arn:aws:sns:us-east-1:000000000000:test-topic"; + + @Spy + private SimpleMeterRegistry meterRegistry; @Mock private AmazonSnsConsumer delegate; @@ -59,404 +67,484 @@ class AmazonSnsConsumerMetricsDecoratorTest { @Mock private TopicProperty topicProperty; - private MeterRegistry registry; - - private AmazonSnsConsumerMetricsDecorator sut; + private AmazonSnsConsumerMetricsDecorator decorator; @BeforeEach void setUp() { - registry = new SimpleMeterRegistry(); when(topicProperty.getTopicArn()).thenReturn(TOPIC_ARN); - sut = new AmazonSnsConsumerMetricsDecorator(delegate, topicProperty, registry); + + decorator = new AmazonSnsConsumerMetricsDecorator(delegate, topicProperty, meterRegistry); } - @Nested - @DisplayName("publish()") - class Publish { + @Test + void testConstructorRegistersPublishAttemptsCounter() { + final Counter counter = meterRegistry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_ATTEMPTS) + .tag("topic", TOPIC_ARN) + .counter(); + + assertThat(counter, notNullValue()); + } - @Test - @DisplayName("should delegate to the wrapped consumer") - void shouldDelegateToWrappedConsumer() { - final PublishBatchRequest request = batchRequest(2); - when(delegate.publish(request)).thenReturn(successResult("id-1", "id-2")); + @Test + void testConstructorRegistersPublishSuccessCounter() { + final Counter counter = meterRegistry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_SUCCESS) + .tag("topic", TOPIC_ARN) + .counter(); - sut.publish(request); + assertThat(counter, notNullValue()); + } - verify(delegate, times(1)).publish(request); - } + @Test + void testConstructorRegistersPublishDurationTimer() { + final Timer timer = meterRegistry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_DURATION) + .tag("topic", TOPIC_ARN) + .timer(); - @Test - @DisplayName("should increment attempt counter on success") - void shouldIncrementAttemptCounterOnSuccess() { - when(delegate.publish(any())).thenReturn(successResult("id-1")); + assertThat(timer, notNullValue()); + } - sut.publish(batchRequest(1)); + @Test + void testConstructorRegistersPublishBatchSizeSummary() { + final DistributionSummary summary = meterRegistry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_BATCH_SIZE) + .tag("topic", TOPIC_ARN) + .summary(); - assertThat(attemptsCount()).isEqualTo(1.0); - } + assertThat(summary, notNullValue()); + } - @Test - @DisplayName("should increment attempt counter even when delegate throws") - void shouldIncrementAttemptCounterOnException() { - when(delegate.publish(any())).thenThrow(new RuntimeException("connection refused")); + @Test + void testConstructorRegistersInflightGauge() { + final Gauge gauge = meterRegistry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_INFLIGHT) + .tag("topic", TOPIC_ARN) + .gauge(); - assertThatThrownBy(() -> sut.publish(batchRequest(1))).isInstanceOf(RuntimeException.class); + assertThat(gauge, notNullValue()); + } - assertThat(attemptsCount()).isEqualTo(1.0); - } + @Test + void testConstructorWithNullMeterRegistryDoesNotThrow() { + final AmazonSnsConsumerMetricsDecorator nullRegistryDecorator = new AmazonSnsConsumerMetricsDecorator(delegate, topicProperty, null); - @Test - @DisplayName("should record batch size in distribution summary") - void shouldRecordBatchSize() { - when(delegate.publish(any())).thenReturn(successResult("a", "b", "c")); + assertThat(nullRegistryDecorator, notNullValue()); + } - sut.publish(batchRequest(3)); + @Test + void testPublishIncrementsAttemptsCounter() { + final PublishBatchRequest request = buildRequest(3); + when(delegate.publish(request)).thenReturn(new PublishBatchResult()); - assertThat(batchSizeCount()).isEqualTo(1L); - assertThat(batchSizeMean()).isEqualTo(3.0); - } + decorator.publish(request); - @Test - @DisplayName("should record duration in timer") - void shouldRecordDurationInTimer() { - when(delegate.publish(any())).thenReturn(successResult("x")); + final double count = meterRegistry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_ATTEMPTS) + .tag("topic", TOPIC_ARN) + .counter() + .count(); - sut.publish(batchRequest(1)); + assertThat(count, equalTo(1.0)); + } - assertThat(timerCount()).isEqualTo(1L); - } + @Test + void testPublishRecordsBatchSize() { + final PublishBatchRequest request = buildRequest(5); + when(delegate.publish(request)).thenReturn(new PublishBatchResult()); - @Test - @DisplayName("should decrement inflight gauge after successful publish") - void shouldDecrementInflightAfterSuccess() { - when(delegate.publish(any())).thenReturn(successResult("y")); + decorator.publish(request); - sut.publish(batchRequest(1)); + final double mean = meterRegistry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_BATCH_SIZE) + .tag("topic", TOPIC_ARN) + .summary() + .mean(); - assertThat(inflightValue()).isZero(); - } + assertThat(mean, equalTo(5.0)); + } - @Test - @DisplayName("should decrement inflight gauge even when delegate throws") - void shouldDecrementInflightAfterException() { - when(delegate.publish(any())).thenThrow(new RuntimeException("timeout")); + @Test + void testPublishRecordsTimer() { + final PublishBatchRequest request = buildRequest(2); + when(delegate.publish(request)).thenReturn(new PublishBatchResult()); - assertThatThrownBy(() -> sut.publish(batchRequest(1))).isInstanceOf(RuntimeException.class); + decorator.publish(request); - assertThat(inflightValue()).isZero(); - } + final long timerCount = meterRegistry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_DURATION) + .tag("topic", TOPIC_ARN) + .timer() + .count(); - @Test - @DisplayName("should propagate RuntimeException from delegate unchanged") - void shouldPropagateRuntimeException() { - final RuntimeException cause = new RuntimeException("sns down"); - when(delegate.publish(any())).thenThrow(cause); + assertThat(timerCount, equalTo(1L)); + } - assertThatThrownBy(() -> sut.publish(batchRequest(1))).isSameAs(cause); - } + @Test + void testPublishInflightGaugeIsZeroAfterCompletion() { + final PublishBatchRequest request = buildRequest(1); + when(delegate.publish(request)).thenReturn(new PublishBatchResult()); - @Test - @DisplayName("should wrap checked Exception from delegate in RuntimeException") - void shouldWrapCheckedExceptionInRuntimeException() { - when(delegate.publish(any())).thenAnswer(inv -> { - throw new Exception("checked"); - }); + decorator.publish(request); - assertThatThrownBy(() -> sut.publish(batchRequest(1))).isInstanceOf(RuntimeException.class).hasMessageContaining("checked"); - } - } + final double inflight = meterRegistry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_INFLIGHT) + .tag("topic", TOPIC_ARN) + .gauge() + .value(); - @Nested - @DisplayName("handleResponse()") - class HandleResponse { + assertThat(inflight, equalTo(0.0)); + } - @Test - @DisplayName("should delegate to the wrapped consumer") - void shouldDelegateToWrappedConsumer() { - final PublishBatchResult result = successResult("id-1"); + @Test + void testPublishInflightGaugeIsZeroAfterException() { + final PublishBatchRequest request = buildRequest(1); + when(delegate.publish(request)).thenThrow(new RuntimeException("delegate error")); - sut.handleResponse(result); + assertThrows(RuntimeException.class, () -> decorator.publish(request)); - verify(delegate, times(1)).handleResponse(result); - } + final double inflight = meterRegistry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_INFLIGHT) + .tag("topic", TOPIC_ARN) + .gauge() + .value(); - @Test - @DisplayName("should increment success counter once per successful message") - void shouldIncrementSuccessCounterPerMessage() { - final PublishBatchResult result = successResult("id-1", "id-2", "id-3"); + assertThat(inflight, equalTo(0.0)); + } - sut.handleResponse(result); + @Test + void testPublishDelegatesCallToDelegate() { + final PublishBatchRequest request = buildRequest(2); + final PublishBatchResult expectedResult = new PublishBatchResult(); + when(delegate.publish(request)).thenReturn(expectedResult); - assertThat(successCount()).isEqualTo(3.0); - } + final PublishBatchResult result = decorator.publish(request); - @Test - @DisplayName("should not increment success counter when there are no successful entries") - void shouldNotIncrementSuccessCounterWhenEmpty() { - final PublishBatchResult result = new PublishBatchResult(); - result.setSuccessful(Collections.emptyList()); - result.setFailed(Collections.singleton(failedEntry("id-1", "InvalidParameter"))); + assertThat(result, equalTo(expectedResult)); + verify(delegate, times(1)).publish(request); + } - sut.handleResponse(result); + @Test + void testPublishRethrowsRuntimeException() { + final PublishBatchRequest request = buildRequest(1); + when(delegate.publish(request)).thenThrow(new IllegalStateException("boom")); - assertThat(successCount()).isZero(); - } + final RuntimeException thrown = assertThrows(RuntimeException.class, () -> decorator.publish(request)); - @Test - @DisplayName("should increment failure counter once per failed message") - void shouldIncrementFailureCounterPerFailedMessage() { - final PublishBatchResult result = new PublishBatchResult(); - result.setSuccessful(Collections.emptyList()); - result.setFailed(Arrays.asList(failedEntry("id-a", "InvalidParameter"), failedEntry("id-b", "MessageTooLong"))); + assertThat(thrown, instanceOf(IllegalStateException.class)); + } - sut.handleResponse(result); + @Test + void testPublishMultipleTimesAccumulatesAttemptsCounter() { + final PublishBatchRequest request = buildRequest(1); + when(delegate.publish(request)).thenReturn(new PublishBatchResult()); - assertThat(failureCountByCode("InvalidParameter")).isEqualTo(1.0); - assertThat(failureCountByCode("MessageTooLong")).isEqualTo(1.0); - } + decorator.publish(request); + decorator.publish(request); + decorator.publish(request); - @Test - @DisplayName("should accumulate failures with the same error code") - void shouldAccumulateFailuresWithSameErrorCode() { - final PublishBatchResult result = new PublishBatchResult(); - result.setSuccessful(Collections.emptyList()); - result.setFailed(Arrays.asList(failedEntry("id-a", "InvalidParameter"), failedEntry("id-b", "InvalidParameter"), failedEntry("id-c", "InvalidParameter"))); + final double count = meterRegistry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_ATTEMPTS) + .tag("topic", TOPIC_ARN) + .counter() + .count(); - sut.handleResponse(result); + assertThat(count, equalTo(3.0)); + } - assertThat(failureCountByCode("InvalidParameter")).isEqualTo(3.0); - } + @Test + void testHandleResponseIncrementsSuccessCounter() { + final PublishBatchResult result = buildResult(3, 0); - @Test - @DisplayName("should handle mixed batch with both successes and failures") - void shouldHandleMixedBatch() { - final PublishBatchResult result = new PublishBatchResult(); - result.setSuccessful(Arrays.asList(successEntry("id-ok-1"), successEntry("id-ok-2"))); - result.setFailed(Collections.singleton(failedEntry("id-bad", "KMSDisabled"))); + decorator.handleResponse(result); - sut.handleResponse(result); + final double count = meterRegistry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_SUCCESS) + .tag("topic", TOPIC_ARN) + .counter() + .count(); - assertThat(successCount()).isEqualTo(2.0); - assertThat(failureCountByCode("KMSDisabled")).isEqualTo(1.0); - } + assertThat(count, equalTo(3.0)); + } - @Test - @DisplayName("should tag failures with error_type 'amazon_service_exception'") - void shouldTagFailuresWithAmazonErrorType() { - final PublishBatchResult result = new PublishBatchResult(); - result.setSuccessful(Collections.emptyList()); - result.setFailed(Collections.singleton(failedEntry("id-x", "ThrottledException"))); + @Test + void testHandleResponseDoesNotIncrementSuccessCounterWhenZeroSuccessful() { + final PublishBatchResult result = buildResult(0, 2); - sut.handleResponse(result); + decorator.handleResponse(result); - final Counter failureCounter = registry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_FAILURE) - .tag("error_code", "ThrottledException") - .tag("error_type", AbstractAmazonSnsConsumerMetricsDecorator.ERROR_TYPE_AMAZON) - .counter(); + final double count = meterRegistry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_SUCCESS) + .tag("topic", TOPIC_ARN) + .counter() + .count(); - assertThat(failureCounter).isNotNull(); - assertThat(failureCounter.count()).isEqualTo(1.0); - } + assertThat(count, equalTo(0.0)); } - @Nested - @DisplayName("handleError()") - class HandleError { + @Test + void testHandleResponseIncrementsFailureCounterForEachFailedEntry() { + final PublishBatchResult result = buildResult(0, 2); + result.getFailed().get(0).setCode("InvalidParameter"); + result.getFailed().get(1).setCode("AuthorizationError"); + + decorator.handleResponse(result); + + final double count1 = meterRegistry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_FAILURE) + .tag("topic", TOPIC_ARN) + .tag(AbstractAmazonSnsConsumerMetricsDecorator.TAG_ERROR_CODE, "InvalidParameter") + .tag(AbstractAmazonSnsConsumerMetricsDecorator.TAG_ERROR_TYPE, AbstractAmazonSnsConsumerMetricsDecorator.ERROR_TYPE_AMAZON) + .counter() + .count(); + + final double count2 = meterRegistry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_FAILURE) + .tag("topic", TOPIC_ARN) + .tag(AbstractAmazonSnsConsumerMetricsDecorator.TAG_ERROR_CODE, "AuthorizationError") + .tag(AbstractAmazonSnsConsumerMetricsDecorator.TAG_ERROR_TYPE, AbstractAmazonSnsConsumerMetricsDecorator.ERROR_TYPE_AMAZON) + .counter() + .count(); - @Test - @DisplayName("should delegate to the wrapped consumer") - void shouldDelegateToWrappedConsumer() { - final PublishBatchRequest request = batchRequest(1); - final RuntimeException cause = new RuntimeException("transport error"); + assertThat(count1, equalTo(1.0)); + assertThat(count2, equalTo(1.0)); + } - sut.handleError(request, cause); + @Test + void testHandleResponseDelegatesCallToDelegate() { + final PublishBatchResult result = buildResult(1, 0); - verify(delegate, times(1)).handleError(request, cause); - } + decorator.handleResponse(result); - @Test - @DisplayName("should count all batch entries as failures on AmazonServiceException") - void shouldCountAllEntriesAsFailures_onAmazonServiceException() { - final PublishBatchRequest request = batchRequest(3); - final AmazonServiceException cause = serviceException("InternalError"); + verify(delegate, times(1)).handleResponse(result); + } - sut.handleError(request, cause); + @Test + void testHandleResponseWithMixedSuccessAndFailure() { + final PublishBatchResult result = buildResult(2, 1); + result.getFailed().get(0).setCode("Throttling"); - assertThat(failureCountByCode("InternalError")).isEqualTo(3.0); - } + decorator.handleResponse(result); - @Test - @DisplayName("should tag AmazonServiceException failures with correct error_type") - void shouldTagAmazonServiceExceptionWithCorrectErrorType() { - final PublishBatchRequest request = batchRequest(1); - final AmazonServiceException cause = serviceException("InternalError"); + final double successCount = meterRegistry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_SUCCESS) + .tag("topic", TOPIC_ARN) + .counter() + .count(); - sut.handleError(request, cause); + final double failureCount = meterRegistry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_FAILURE) + .tag("topic", TOPIC_ARN) + .tag(AbstractAmazonSnsConsumerMetricsDecorator.TAG_ERROR_CODE, "Throttling") + .tag(AbstractAmazonSnsConsumerMetricsDecorator.TAG_ERROR_TYPE, AbstractAmazonSnsConsumerMetricsDecorator.ERROR_TYPE_AMAZON) + .counter() + .count(); - final Counter counter = registry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_FAILURE) - .tag("error_code", "InternalError") - .tag("error_type", AbstractAmazonSnsConsumerMetricsDecorator.ERROR_TYPE_AMAZON) - .counter(); + assertThat(successCount, equalTo(2.0)); + assertThat(failureCount, equalTo(1.0)); + } - assertThat(counter).isNotNull(); - assertThat(counter.count()).isEqualTo(1.0); - } + @Test + void testHandleResponseAccumulatesSuccessCounterAcrossMultipleCalls() { + decorator.handleResponse(buildResult(2, 0)); + decorator.handleResponse(buildResult(3, 0)); - @Test - @DisplayName("should use error code '000' for non-AmazonServiceException") - void shouldUseDefaultErrorCode_forGenericException() { - final PublishBatchRequest request = batchRequest(2); + final double count = meterRegistry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_SUCCESS) + .tag("topic", TOPIC_ARN) + .counter() + .count(); - sut.handleError(request, new RuntimeException("network timeout")); + assertThat(count, equalTo(5.0)); + } - assertThat(failureCountByCode("000")).isEqualTo(2.0); - } + @Test + void testHandleErrorWithAmazonServiceExceptionIncrementsFailureCounter() { + final PublishBatchRequest request = buildRequest(3); + final AmazonServiceException ex = new AmazonServiceException("Service error"); + ex.setErrorCode("ServiceUnavailable"); - @Test - @DisplayName("should tag generic exceptions with error_type 'unknown'") - void shouldTagGenericExceptionsWithUnknownErrorType() { - final PublishBatchRequest request = batchRequest(1); + decorator.handleError(request, ex); - sut.handleError(request, new RuntimeException("network timeout")); + final double count = meterRegistry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_FAILURE) + .tag("topic", TOPIC_ARN) + .tag(AbstractAmazonSnsConsumerMetricsDecorator.TAG_ERROR_CODE, "ServiceUnavailable") + .tag(AbstractAmazonSnsConsumerMetricsDecorator.TAG_ERROR_TYPE, AbstractAmazonSnsConsumerMetricsDecorator.ERROR_TYPE_AMAZON) + .counter() + .count(); - final Counter counter = registry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_FAILURE) - .tag("error_code", "000") - .tag("error_type", AbstractAmazonSnsConsumerMetricsDecorator.ERROR_TYPE_OTHER) - .counter(); + assertThat(count, equalTo(3.0)); + } - assertThat(counter).isNotNull(); - assertThat(counter.count()).isEqualTo(1.0); - } + @Test + void testHandleErrorWithUnknownExceptionIncrementsFailureCounterWithCode000() { + final PublishBatchRequest request = buildRequest(2); + final RuntimeException ex = new RuntimeException("unexpected"); - @Test - @DisplayName("should count each entry individually when batch has multiple entries") - void shouldCountEachEntryIndividually() { - final PublishBatchRequest request = batchRequest(5); - final AmazonServiceException cause = serviceException("ServiceUnavailable"); + decorator.handleError(request, ex); - sut.handleError(request, cause); + final double count = meterRegistry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_FAILURE) + .tag("topic", TOPIC_ARN) + .tag(AbstractAmazonSnsConsumerMetricsDecorator.TAG_ERROR_CODE, "000") + .tag(AbstractAmazonSnsConsumerMetricsDecorator.TAG_ERROR_TYPE, AbstractAmazonSnsConsumerMetricsDecorator.ERROR_TYPE_OTHER) + .counter() + .count(); - assertThat(failureCountByCode("ServiceUnavailable")).isEqualTo(5.0); - } + assertThat(count, equalTo(2.0)); } - @Nested - @DisplayName("lifecycle methods") - class Lifecycle { + @Test + void testHandleErrorWithAmazonServiceExceptionUsesAmazonErrorType() { + final PublishBatchRequest request = buildRequest(1); + final AmazonServiceException ex = new AmazonServiceException("error"); + ex.setErrorCode("AccessDenied"); + + decorator.handleError(request, ex); - @Test - @DisplayName("shutdown() should delegate to the wrapped consumer") - void shutdown_shouldDelegate() { - sut.shutdown(); - verify(delegate, times(1)).shutdown(); - } + final Counter counter = meterRegistry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_FAILURE) + .tag("topic", TOPIC_ARN) + .tag(AbstractAmazonSnsConsumerMetricsDecorator.TAG_ERROR_TYPE, AbstractAmazonSnsConsumerMetricsDecorator.ERROR_TYPE_AMAZON) + .counter(); - @Test - @DisplayName("await() should delegate to the wrapped consumer") - void await_shouldDelegate() { - sut.await(); - verify(delegate, times(1)).await(); - } + assertThat(counter, notNullValue()); + assertThat(counter.count(), equalTo(1.0)); } - @Nested - @DisplayName("null MeterRegistry") - class NullRegistry { + @Test + void testHandleErrorWithNonAmazonExceptionUsesUnknownErrorType() { + final PublishBatchRequest request = buildRequest(1); + final NullPointerException ex = new NullPointerException("npe"); + + decorator.handleError(request, ex); + + final Counter counter = meterRegistry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_FAILURE) + .tag("topic", TOPIC_ARN) + .tag(AbstractAmazonSnsConsumerMetricsDecorator.TAG_ERROR_TYPE, AbstractAmazonSnsConsumerMetricsDecorator.ERROR_TYPE_OTHER) + .counter(); + + assertThat(counter, notNullValue()); + assertThat(counter.count(), equalTo(1.0)); + } - @Test - @DisplayName("should not throw when MeterRegistry is null") - void shouldNotThrowWhenRegistryIsNull() { - final AmazonSnsConsumerMetricsDecorator nullRegistrySut = new AmazonSnsConsumerMetricsDecorator(delegate, topicProperty, null); + @Test + void testHandleErrorDelegatesCallToDelegate() { + final PublishBatchRequest request = buildRequest(1); + final RuntimeException ex = new RuntimeException("err"); - when(delegate.publish(any())).thenReturn(successResult("id-1")); + decorator.handleError(request, ex); - // should execute without NullPointerException - nullRegistrySut.publish(batchRequest(1)); - } + verify(delegate, times(1)).handleError(request, ex); } - private double attemptsCount() { - return registry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_ATTEMPTS) + @Test + void testHandleErrorCountsAllEntriesInBatch() { + final PublishBatchRequest request = buildRequest(10); + final RuntimeException ex = new RuntimeException("bulk failure"); + + decorator.handleError(request, ex); + + final double count = meterRegistry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_FAILURE) .tag("topic", TOPIC_ARN) + .tag(AbstractAmazonSnsConsumerMetricsDecorator.TAG_ERROR_CODE, "000") + .tag(AbstractAmazonSnsConsumerMetricsDecorator.TAG_ERROR_TYPE, AbstractAmazonSnsConsumerMetricsDecorator.ERROR_TYPE_OTHER) .counter() .count(); + + assertThat(count, equalTo(10.0)); } - private double successCount() { - final Counter c = registry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_SUCCESS) - .tag("topic", TOPIC_ARN) - .counter(); + @Test + void testShutdownDelegatesCallToDelegate() { + decorator.shutdown(); + verify(delegate, times(1)).shutdown(); + } - return c == null ? 0.0 : c.count(); + @Test + void testAwaitDelegatesCallToDelegate() { + when(delegate.await()).thenReturn(CompletableFuture.completedFuture(null)); + + decorator.await(); + + verify(delegate, times(1)).await(); } - private double failureCountByCode(final String code) { - final Counter c = registry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_FAILURE) - .tag("topic", TOPIC_ARN) - .tag("error_code", code) - .counter(); + @Test + void testAwaitReturnsTheDelegatesFuture() { + final CompletableFuture expected = CompletableFuture.completedFuture(null); + when(delegate.await()).thenReturn(expected); - return c == null ? 0.0 : c.count(); + final CompletableFuture result = decorator.await(); + + assertThat(result, equalTo(expected)); } - private long batchSizeCount() { - return registry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_BATCH_SIZE) + @Test + void testFullSuccessfulPublishFlowUpdatesAllMetrics() { + final PublishBatchRequest request = buildRequest(4); + final PublishBatchResult batchResult = buildResult(4, 0); + when(delegate.publish(request)).thenReturn(batchResult); + + final PublishBatchResult result = decorator.publish(request); + decorator.handleResponse(result); + + final double attempts = meterRegistry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_ATTEMPTS) .tag("topic", TOPIC_ARN) - .summary() + .counter() + .count(); + + final double success = meterRegistry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_SUCCESS) + .tag("topic", TOPIC_ARN) + .counter() .count(); - } - private double batchSizeMean() { - return registry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_BATCH_SIZE) + final double batchMean = meterRegistry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_BATCH_SIZE) .tag("topic", TOPIC_ARN) .summary() .mean(); - } - private long timerCount() { - return registry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_DURATION) + final long timerCount = meterRegistry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_DURATION) .tag("topic", TOPIC_ARN) .timer() .count(); + + assertThat(attempts, equalTo(1.0)); + assertThat(success, equalTo(4.0)); + assertThat(batchMean, equalTo(4.0)); + assertThat(timerCount, equalTo(1L)); } - private double inflightValue() { - return registry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_INFLIGHT) + @Test + void testFullErrorFlowUpdatesFailureMetrics() { + final PublishBatchRequest request = buildRequest(3); + final AmazonServiceException ex = new AmazonServiceException("timeout"); + ex.setErrorCode("RequestExpired"); + when(delegate.publish(request)).thenThrow(ex); + + assertThrows(RuntimeException.class, () -> decorator.publish(request)); + decorator.handleError(request, ex); + + final double attempts = meterRegistry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_ATTEMPTS) .tag("topic", TOPIC_ARN) - .gauge() - .value(); - } + .counter() + .count(); - private static PublishBatchRequest batchRequest(final int count) { - final PublishBatchRequest request = new PublishBatchRequest().withTopicArn(TOPIC_ARN); - for (int i = 0; i < count; i++) { - request.getPublishBatchRequestEntries().add(new PublishBatchRequestEntry().withId("id-" + i).withMessage("msg-" + i)); - } - return request; - } + final double failures = meterRegistry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_FAILURE) + .tag("topic", TOPIC_ARN) + .tag(AbstractAmazonSnsConsumerMetricsDecorator.TAG_ERROR_CODE, "RequestExpired") + .tag(AbstractAmazonSnsConsumerMetricsDecorator.TAG_ERROR_TYPE, AbstractAmazonSnsConsumerMetricsDecorator.ERROR_TYPE_AMAZON) + .counter().count(); - private static PublishBatchResult successResult(final String... ids) { - final PublishBatchResult result = new PublishBatchResult(); - for (final String id : ids) { - result.getSuccessful().add(successEntry(id)); - } - result.setFailed(Collections.emptyList()); - return result; + assertThat(attempts, equalTo(1.0)); + assertThat(failures, equalTo(3.0)); } - private static PublishBatchResultEntry successEntry(final String id) { - return new PublishBatchResultEntry().withId(id).withMessageId("msg-" + id); - } + private PublishBatchRequest buildRequest(final int size) { + final List entries = new ArrayList<>(); - private static BatchResultErrorEntry failedEntry(final String id, final String code) { - return new BatchResultErrorEntry().withId(id).withCode(code).withMessage("error detail").withSenderFault(true); + IntStream.range(0, size).forEach(i -> { + entries.add(new PublishBatchRequestEntry().withId("id-" + i).withMessage("msg-" + i)); + }); + + return new PublishBatchRequest().withPublishBatchRequestEntries(entries); } - private static AmazonServiceException serviceException(final String code) { - final AmazonServiceException ex = new AmazonServiceException("Service error"); - ex.setErrorCode(code); - return ex; + private PublishBatchResult buildResult(final int successCount, final int failureCount) { + final List successful = new ArrayList<>(); + + IntStream.range(0, successCount).forEach(i -> { + successful.add(new PublishBatchResultEntry().withId("id-" + i).withMessageId("msg-id-" + i)); + }); + + final List failed = new java.util.ArrayList<>(); + + IntStream.range(0, failureCount).forEach(i -> { + failed.add(new BatchResultErrorEntry().withId("id-fail-" + i).withCode("ErrorCode").withMessage("error")); + }); + + return new PublishBatchResult().withSuccessful(successful).withFailed(failed); } -} -// @formatter:on \ No newline at end of file +} \ No newline at end of file diff --git a/amazon-sns-java-messaging-lib-v2/README.md b/amazon-sns-java-messaging-lib-v2/README.md new file mode 100644 index 0000000..517e576 --- /dev/null +++ b/amazon-sns-java-messaging-lib-v2/README.md @@ -0,0 +1,138 @@ +# amazon-sns-java-messaging-lib-v2 + +AWS SDK v2 implementation of the Amazon SNS Java Messaging Library. Provides batched message publishing to SNS using `software.amazon.awssdk:sns:2.20.162`. + +## Package Structure + +```text +com.amazon.sns.messaging.lib + ├── core/ + │ ├── AmazonSnsTemplate.java -- Public API entry point + │ ├── AmazonSnsProducerImpl.java -- Producer (enqueues requests) + │ ├── AmazonSnsConsumerImpl.java -- Consumer (calls SnsClient.publishBatch) + │ └── MessageAttributes.java -- Header-to-MessageAttributeValue converter + └── metrics/ + └── AmazonSnsConsumerMetricsDecorator.java -- Micrometer metrics decorator +``` + +## Key Classes + +| Class | Description | +|-------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `AmazonSnsTemplate` | Extends `AbstractAmazonSnsTemplate`. Primary API: `send()`, `shutdown()`, `await()`. Use the builder: `AmazonSnsTemplate.builder(snsClient, topicProperty)`. Deprecated constructors available for backward compatibility. | +| `AmazonSnsProducerImpl` | Extends `AbstractAmazonSnsProducer`. Thin wrapper that enqueues `RequestEntry` into a shared blocking queue. | +| `AmazonSnsConsumerImpl` | Extends `AbstractAmazonSnsConsumer`. Calls `SnsClient.publishBatch()` with v2 `PublishBatchRequest`/`PublishBatchResponse`. Handles per-entry success/failure from batch response. | +| `MessageAttributes` | Extends `AbstractMessageAttributes`. Converts header entries to v2 `MessageAttributeValue` objects (String, Number, Binary via `SdkBytes`, String.Array, Enum). | +| `AmazonSnsConsumerMetricsDecorator` | Extends `AbstractAmazonSnsConsumerMetricsDecorator`. Records publish attempts, latency, batch size, inflight count. Tags failures by `AwsServiceException` error code. | + +## Dependencies + +```text +com.github.mvallim:amazon-sns-java-messaging-lib-template:1.3.0 +software.amazon.awssdk:sns:2.20.162 +``` + +Test-only: + +```text +software.amazon.awssdk:sqs:2.20.162 (test scope) +``` + +## Usage + +### Standard SNS Topic + +```java +SnsClient snsClient = SnsClient.create(); + +TopicProperty topicProperty = TopicProperty.builder() + .fifo(false) + .linger(100) + .maxBatchSize(10) + .maximumPoolSize(20) + .topicArn("arn:aws:sns:us-east-2:000000000000:topic") + .build(); + +AmazonSnsTemplate snsTemplate = AmazonSnsTemplate.builder(snsClient, topicProperty).build(); + +RequestEntry entry = RequestEntry.builder() + .withValue(new MyMessage()) + .withMessageHeaders(Map.of("header1", "value1")) + .build(); + +snsTemplate.send(entry); +snsTemplate.shutdown(); +``` + +### FIFO SNS Topic + +```java +TopicProperty topicProperty = TopicProperty.builder() + .fifo(true) + .linger(100) + .maxBatchSize(10) + .maximumPoolSize(20) + .topicArn("arn:aws:sns:us-east-2:000000000000:topic.fifo") + .build(); + +AmazonSnsTemplate snsTemplate = AmazonSnsTemplate.builder(snsClient, topicProperty).build(); + +RequestEntry entry = RequestEntry.builder() + .withValue(new MyMessage()) + .withGroupId(UUID.randomUUID().toString()) + .withDeduplicationId(UUID.randomUUID().toString()) + .build(); + +snsTemplate.send(entry).addCallback( + success -> LOGGER.info("Sent: {}", success), + failure -> LOGGER.error("Failed: {}", failure) +); +snsTemplate.await().join(); +``` + +### With Custom ObjectMapper and Queue + +```java +AmazonSnsTemplate snsTemplate = AmazonSnsTemplate.builder(snsClient, topicProperty) + .objectMapper(new ObjectMapper()) + .topicRequests(new LinkedBlockingQueue<>(100)) + .publishDecorator(req -> req) + .build(); +``` + +### Using Micrometer Metrics + +```java +MeterRegistry meterRegistry = new CompositeMeterRegistry(); +meterRegistry.add(new JmxMeterRegistry()); + +AmazonSnsTemplate snsTemplate = AmazonSnsTemplate.builder(snsClient, topicProperty) + .meterRegistry(meterRegistry) + .build(); +``` + +## Metrics + +When a `MeterRegistry` is provided via `.meterRegistry()`, the following metrics are recorded: + +| Metric | Type | Tags | Description | +|--------------------------------|---------------------|-------------------------------------|-----------------------------| +| `sns.publish.attempts` | Counter | `topic` | Total PublishBatch attempts | +| `sns.publish.success` | Counter | `topic` | Successful messages | +| `sns.publish.failure` | Counter | `topic`, `error_code`, `error_type` | Failed messages | +| `sns.publish.duration` | Timer (p50/p95/p99) | `topic` | Publish latency | +| `sns.publish.batch.size` | DistributionSummary | `topic` | Messages per batch | +| `sns.publish.inflight` | Gauge | `topic` | In-flight publish batches | +| `blocking.queue.puts.total` | Counter | `name` | Successful put operations | +| `blocking.queue.puts.failed` | Counter | `name` | Failed put operations | +| `blocking.queue.put.duration` | Timer | `name` | Put latency | +| `blocking.queue.takes.total` | Counter | `name` | Successful take operations | +| `blocking.queue.takes.failed` | Counter | `name` | Failed take operations | +| `blocking.queue.take.duration` | Timer | `name` | Take latency | +| `blocking.queue.size` | Gauge | `name` | Queue depth | +| `executor.active` | Gauge | `name` | Active tasks | +| `executor.tasks.succeeded` | Counter | `name` | Successful tasks | +| `executor.tasks.failed` | Counter | `name` | Failed tasks | +| `executor.task.duration` | Timer | `name` | Task duration | + +See the [Technical Guide](../GUIDE.md#metrics-micrometer) for details. diff --git a/amazon-sns-java-messaging-lib-v2/pom.xml b/amazon-sns-java-messaging-lib-v2/pom.xml index cd3e837..a930d6f 100644 --- a/amazon-sns-java-messaging-lib-v2/pom.xml +++ b/amazon-sns-java-messaging-lib-v2/pom.xml @@ -7,7 +7,7 @@ com.github.mvallim amazon-sns-java-messaging-lib - 1.2.0-SNAPSHOT + 1.3.0-SNAPSHOT ../pom.xml diff --git a/amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsConsumerImpl.java b/amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsConsumerImpl.java index c88efb7..d98a09d 100644 --- a/amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsConsumerImpl.java +++ b/amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsConsumerImpl.java @@ -1,5 +1,5 @@ /* - * Copyright 2023 the original author or authors. + * Copyright 2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducerImpl.java b/amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducerImpl.java index 8b06b39..15d002b 100644 --- a/amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducerImpl.java +++ b/amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducerImpl.java @@ -1,5 +1,5 @@ /* - * Copyright 2023 the original author or authors. + * Copyright 2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsTemplate.java b/amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsTemplate.java index d9403ae..e631ceb 100644 --- a/amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsTemplate.java +++ b/amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsTemplate.java @@ -1,5 +1,5 @@ /* - * Copyright 2023 the original author or authors. + * Copyright 2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/core/MessageAttributes.java b/amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/core/MessageAttributes.java index aff034d..0f5cd14 100644 --- a/amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/core/MessageAttributes.java +++ b/amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/core/MessageAttributes.java @@ -1,5 +1,5 @@ /* - * Copyright 2023 the original author or authors. + * Copyright 2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/metrics/AmazonSnsConsumerMetricsDecorator.java b/amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/metrics/AmazonSnsConsumerMetricsDecorator.java index 775a53a..923dbe7 100644 --- a/amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/metrics/AmazonSnsConsumerMetricsDecorator.java +++ b/amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/metrics/AmazonSnsConsumerMetricsDecorator.java @@ -1,3 +1,19 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.amazon.sns.messaging.lib.metrics; import org.slf4j.Logger; @@ -7,6 +23,7 @@ import com.amazon.sns.messaging.lib.model.TopicProperty; import io.micrometer.core.instrument.MeterRegistry; +import lombok.SneakyThrows; import software.amazon.awssdk.awscore.exception.AwsServiceException; import software.amazon.awssdk.services.sns.model.PublishBatchRequest; import software.amazon.awssdk.services.sns.model.PublishBatchResponse; @@ -39,6 +56,7 @@ public AmazonSnsConsumerMetricsDecorator( * {@inheritDoc} */ @Override + @SneakyThrows public PublishBatchResponse publish(final PublishBatchRequest publishBatchRequest) { publishAttemptsCounter.increment(); batchSizeSummary.record(publishBatchRequest.publishBatchRequestEntries().size()); @@ -46,10 +64,6 @@ public PublishBatchResponse publish(final PublishBatchRequest publishBatchReques try { return publishTimer.recordCallable(() -> delegate.publish(publishBatchRequest)); - } catch (final RuntimeException ex) { - throw ex; - } catch (final Exception ex) { - throw new RuntimeException(ex); } finally { inflightGauge.decrementAndGet(); } diff --git a/amazon-sns-java-messaging-lib-v2/src/test/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducerAsyncTest.java b/amazon-sns-java-messaging-lib-v2/src/test/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducerAsyncTest.java index 5ccf1e5..ad7a5fc 100644 --- a/amazon-sns-java-messaging-lib-v2/src/test/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducerAsyncTest.java +++ b/amazon-sns-java-messaging-lib-v2/src/test/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducerAsyncTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2023 the original author or authors. + * Copyright 2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/amazon-sns-java-messaging-lib-v2/src/test/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducerSyncTest.java b/amazon-sns-java-messaging-lib-v2/src/test/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducerSyncTest.java index 18d71e7..1c3691b 100644 --- a/amazon-sns-java-messaging-lib-v2/src/test/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducerSyncTest.java +++ b/amazon-sns-java-messaging-lib-v2/src/test/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducerSyncTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2023 the original author or authors. + * Copyright 2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/amazon-sns-java-messaging-lib-v2/src/test/java/com/amazon/sns/messaging/lib/core/AmazonSnsTemplateIntegrationTest.java b/amazon-sns-java-messaging-lib-v2/src/test/java/com/amazon/sns/messaging/lib/core/AmazonSnsTemplateIntegrationTest.java index 3c1d6a1..237eb9e 100644 --- a/amazon-sns-java-messaging-lib-v2/src/test/java/com/amazon/sns/messaging/lib/core/AmazonSnsTemplateIntegrationTest.java +++ b/amazon-sns-java-messaging-lib-v2/src/test/java/com/amazon/sns/messaging/lib/core/AmazonSnsTemplateIntegrationTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2023 the original author or authors. + * Copyright 2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/amazon-sns-java-messaging-lib-v2/src/test/java/com/amazon/sns/messaging/lib/core/MessageAttributesTest.java b/amazon-sns-java-messaging-lib-v2/src/test/java/com/amazon/sns/messaging/lib/core/MessageAttributesTest.java index 6399e4e..dd91e79 100644 --- a/amazon-sns-java-messaging-lib-v2/src/test/java/com/amazon/sns/messaging/lib/core/MessageAttributesTest.java +++ b/amazon-sns-java-messaging-lib-v2/src/test/java/com/amazon/sns/messaging/lib/core/MessageAttributesTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2023 the original author or authors. + * Copyright 2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/amazon-sns-java-messaging-lib-v2/src/test/java/com/amazon/sns/messaging/lib/core/helper/ConsumerHelper.java b/amazon-sns-java-messaging-lib-v2/src/test/java/com/amazon/sns/messaging/lib/core/helper/ConsumerHelper.java index cbd68ea..e36d340 100644 --- a/amazon-sns-java-messaging-lib-v2/src/test/java/com/amazon/sns/messaging/lib/core/helper/ConsumerHelper.java +++ b/amazon-sns-java-messaging-lib-v2/src/test/java/com/amazon/sns/messaging/lib/core/helper/ConsumerHelper.java @@ -1,5 +1,5 @@ /* - * Copyright 2023 the original author or authors. + * Copyright 2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/amazon-sns-java-messaging-lib-v2/src/test/java/com/amazon/sns/messaging/lib/metrics/AmazonSnsConsumerMetricsDecoratorTest.java b/amazon-sns-java-messaging-lib-v2/src/test/java/com/amazon/sns/messaging/lib/metrics/AmazonSnsConsumerMetricsDecoratorTest.java index 179ef13..0d55a06 100644 --- a/amazon-sns-java-messaging-lib-v2/src/test/java/com/amazon/sns/messaging/lib/metrics/AmazonSnsConsumerMetricsDecoratorTest.java +++ b/amazon-sns-java-messaging-lib-v2/src/test/java/com/amazon/sns/messaging/lib/metrics/AmazonSnsConsumerMetricsDecoratorTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2023 the original author or authors. + * Copyright 2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,32 +16,35 @@ package com.amazon.sns.messaging.lib.metrics; -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.mockito.ArgumentMatchers.any; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import java.util.Arrays; -import java.util.Collections; -import java.util.LinkedList; +import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.stream.IntStream; +import org.apache.commons.lang3.reflect.FieldUtils; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; +import org.mockito.Spy; import org.mockito.junit.jupiter.MockitoExtension; import com.amazon.sns.messaging.lib.core.AmazonSnsConsumer; import com.amazon.sns.messaging.lib.model.TopicProperty; import io.micrometer.core.instrument.Counter; -import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.DistributionSummary; +import io.micrometer.core.instrument.Gauge; +import io.micrometer.core.instrument.Timer; import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import software.amazon.awssdk.awscore.exception.AwsErrorDetails; import software.amazon.awssdk.awscore.exception.AwsServiceException; @@ -55,7 +58,10 @@ @ExtendWith(MockitoExtension.class) class AmazonSnsConsumerMetricsDecoratorTest { - private static final String TOPIC_ARN = "arn:aws:sns:us-east-1:000000000000:my-topic"; + private static final String TOPIC_ARN = "arn:aws:sns:us-east-1:000000000000:test-topic"; + + @Spy + private SimpleMeterRegistry meterRegistry; @Mock private AmazonSnsConsumer delegate; @@ -63,416 +69,492 @@ class AmazonSnsConsumerMetricsDecoratorTest { @Mock private TopicProperty topicProperty; - private MeterRegistry registry; - - private AmazonSnsConsumerMetricsDecorator sut; + private AmazonSnsConsumerMetricsDecorator decorator; @BeforeEach void setUp() { - registry = new SimpleMeterRegistry(); when(topicProperty.getTopicArn()).thenReturn(TOPIC_ARN); - sut = new AmazonSnsConsumerMetricsDecorator(delegate, topicProperty, registry); + + decorator = new AmazonSnsConsumerMetricsDecorator(delegate, topicProperty, meterRegistry); } - @Nested - @DisplayName("publish()") - class Publish { + @Test + void testConstructorRegistersPublishAttemptsCounter() { + final Counter counter = meterRegistry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_ATTEMPTS) + .tag("topic", TOPIC_ARN) + .counter(); - @Test - @DisplayName("should delegate to the wrapped consumer") - void shouldDelegateToWrappedConsumer() { - final PublishBatchRequest request = batchRequest(2); - when(delegate.publish(request)).thenReturn(successResult("id-1", "id-2")); + assertThat(counter, notNullValue()); + } - sut.publish(request); + @Test + void testConstructorRegistersPublishSuccessCounter() { + final Counter counter = meterRegistry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_SUCCESS) + .tag("topic", TOPIC_ARN) + .counter(); - verify(delegate, times(1)).publish(request); - } + assertThat(counter, notNullValue()); + } - @Test - @DisplayName("should increment attempt counter on success") - void shouldIncrementAttemptCounterOnSuccess() { - when(delegate.publish(any())).thenReturn(successResult("id-1")); + @Test + void testConstructorRegistersPublishDurationTimer() { + final Timer timer = meterRegistry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_DURATION) + .tag("topic", TOPIC_ARN) + .timer(); + + assertThat(timer, notNullValue()); + } + + @Test + void testConstructorRegistersPublishBatchSizeSummary() { + final DistributionSummary summary = meterRegistry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_BATCH_SIZE) + .tag("topic", TOPIC_ARN) + .summary(); - sut.publish(batchRequest(1)); + assertThat(summary, notNullValue()); + } - assertThat(attemptsCount()).isEqualTo(1.0); - } + @Test + void testConstructorRegistersInflightGauge() { + final Gauge gauge = meterRegistry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_INFLIGHT) + .tag("topic", TOPIC_ARN) + .gauge(); - @Test - @DisplayName("should increment attempt counter even when delegate throws") - void shouldIncrementAttemptCounterOnException() { - when(delegate.publish(any())).thenThrow(new RuntimeException("connection refused")); + assertThat(gauge, notNullValue()); + } - assertThatThrownBy(() -> sut.publish(batchRequest(1))).isInstanceOf(RuntimeException.class); + @Test + void testConstructorWithNullMeterRegistryDoesNotThrow() { + final AmazonSnsConsumerMetricsDecorator nullRegistryDecorator = new AmazonSnsConsumerMetricsDecorator(delegate, topicProperty, null); - assertThat(attemptsCount()).isEqualTo(1.0); - } + assertThat(nullRegistryDecorator, notNullValue()); + } - @Test - @DisplayName("should record batch size in distribution summary") - void shouldRecordBatchSize() { - when(delegate.publish(any())).thenReturn(successResult("a", "b", "c")); + @Test + void testPublishIncrementsAttemptsCounter() { + final PublishBatchRequest request = buildRequest(3); + when(delegate.publish(request)).thenReturn(PublishBatchResponse.builder().build()); - sut.publish(batchRequest(3)); + decorator.publish(request); - assertThat(batchSizeCount()).isEqualTo(1L); - assertThat(batchSizeMean()).isEqualTo(3.0); - } + final double count = meterRegistry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_ATTEMPTS) + .tag("topic", TOPIC_ARN) + .counter() + .count(); - @Test - @DisplayName("should record duration in timer") - void shouldRecordDurationInTimer() { - when(delegate.publish(any())).thenReturn(successResult("x")); + assertThat(count, equalTo(1.0)); + } - sut.publish(batchRequest(1)); + @Test + void testPublishRecordsBatchSize() { + final PublishBatchRequest request = buildRequest(5); + when(delegate.publish(request)).thenReturn(PublishBatchResponse.builder().build()); - assertThat(timerCount()).isEqualTo(1L); - } + decorator.publish(request); - @Test - @DisplayName("should decrement inflight gauge after successful publish") - void shouldDecrementInflightAfterSuccess() { - when(delegate.publish(any())).thenReturn(successResult("y")); + final double mean = meterRegistry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_BATCH_SIZE) + .tag("topic", TOPIC_ARN) + .summary() + .mean(); - sut.publish(batchRequest(1)); + assertThat(mean, equalTo(5.0)); + } - assertThat(inflightValue()).isZero(); - } + @Test + void testPublishRecordsTimer() { + final PublishBatchRequest request = buildRequest(2); + when(delegate.publish(request)).thenReturn(PublishBatchResponse.builder().build()); - @Test - @DisplayName("should decrement inflight gauge even when delegate throws") - void shouldDecrementInflightAfterException() { - when(delegate.publish(any())).thenThrow(new RuntimeException("timeout")); + decorator.publish(request); - assertThatThrownBy(() -> sut.publish(batchRequest(1))).isInstanceOf(RuntimeException.class); + final long timerCount = meterRegistry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_DURATION) + .tag("topic", TOPIC_ARN) + .timer() + .count(); - assertThat(inflightValue()).isZero(); - } + assertThat(timerCount, equalTo(1L)); + } - @Test - @DisplayName("should propagate RuntimeException from delegate unchanged") - void shouldPropagateRuntimeException() { - final RuntimeException cause = new RuntimeException("sns down"); - when(delegate.publish(any())).thenThrow(cause); + @Test + void testPublishInflightGaugeIsZeroAfterCompletion() { + final PublishBatchRequest request = buildRequest(1); + when(delegate.publish(request)).thenReturn(PublishBatchResponse.builder().build()); - assertThatThrownBy(() -> sut.publish(batchRequest(1))).isSameAs(cause); - } + decorator.publish(request); - @Test - @DisplayName("should wrap checked Exception from delegate in RuntimeException") - void shouldWrapCheckedExceptionInRuntimeException() { - when(delegate.publish(any())).thenAnswer(inv -> { - throw new Exception("checked"); - }); + final double inflight = meterRegistry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_INFLIGHT) + .tag("topic", TOPIC_ARN) + .gauge() + .value(); - assertThatThrownBy(() -> sut.publish(batchRequest(1))).isInstanceOf(RuntimeException.class).hasMessageContaining("checked"); - } + assertThat(inflight, equalTo(0.0)); } - @Nested - @DisplayName("handleResponse()") - class HandleResponse { - - @Test - @DisplayName("should delegate to the wrapped consumer") - void shouldDelegateToWrappedConsumer() { - final PublishBatchResponse result = successResult("id-1"); + @Test + void testPublishInflightGaugeIsZeroAfterException() { + final PublishBatchRequest request = buildRequest(1); + when(delegate.publish(request)).thenThrow(new RuntimeException("delegate error")); - sut.handleResponse(result); + assertThrows(RuntimeException.class, () -> decorator.publish(request)); - verify(delegate, times(1)).handleResponse(result); - } + final double inflight = meterRegistry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_INFLIGHT) + .tag("topic", TOPIC_ARN) + .gauge() + .value(); - @Test - @DisplayName("should increment success counter once per successful message") - void shouldIncrementSuccessCounterPerMessage() { - final PublishBatchResponse result = successResult("id-1", "id-2", "id-3"); + assertThat(inflight, equalTo(0.0)); + } - sut.handleResponse(result); + @Test + void testPublishDelegatesCallToDelegate() { + final PublishBatchRequest request = buildRequest(2); + final PublishBatchResponse expectedResult = PublishBatchResponse.builder().build(); + when(delegate.publish(request)).thenReturn(expectedResult); - assertThat(successCount()).isEqualTo(3.0); - } + final PublishBatchResponse result = decorator.publish(request); - @Test - @DisplayName("should not increment success counter when there are no successful entries") - void shouldNotIncrementSuccessCounterWhenEmpty() { - final PublishBatchResponse result = PublishBatchResponse.builder() - .successful(Collections.emptyList()) - .failed(Collections.singleton(failedEntry("id-1", "InvalidParameter"))) - .build(); + assertThat(result, equalTo(expectedResult)); + verify(delegate, times(1)).publish(request); + } - sut.handleResponse(result); + @Test + void testPublishRethrowsRuntimeException() { + final PublishBatchRequest request = buildRequest(1); + when(delegate.publish(request)).thenThrow(new IllegalStateException("boom")); - assertThat(successCount()).isZero(); - } + final RuntimeException thrown = assertThrows(RuntimeException.class, () -> decorator.publish(request)); - @Test - @DisplayName("should increment failure counter once per failed message") - void shouldIncrementFailureCounterPerFailedMessage() { - final PublishBatchResponse result = PublishBatchResponse.builder() - .successful(Collections.emptyList()) - .failed(Arrays.asList(failedEntry("id-a", "InvalidParameter"), failedEntry("id-b", "MessageTooLong"))) - .build(); + assertThat(thrown, instanceOf(IllegalStateException.class)); + } - sut.handleResponse(result); + @Test + void testPublishMultipleTimesAccumulatesAttemptsCounter() { + final PublishBatchRequest request = buildRequest(1); + when(delegate.publish(request)).thenReturn(PublishBatchResponse.builder().build()); - assertThat(failureCountByCode("InvalidParameter")).isEqualTo(1.0); - assertThat(failureCountByCode("MessageTooLong")).isEqualTo(1.0); - } + decorator.publish(request); + decorator.publish(request); + decorator.publish(request); - @Test - @DisplayName("should accumulate failures with the same error code") - void shouldAccumulateFailuresWithSameErrorCode() { - final PublishBatchResponse result = PublishBatchResponse.builder() - .successful(Collections.emptyList()) - .failed(Arrays.asList(failedEntry("id-a", "InvalidParameter"), failedEntry("id-b", "InvalidParameter"), failedEntry("id-c", "InvalidParameter"))) - .build(); + final double count = meterRegistry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_ATTEMPTS) + .tag("topic", TOPIC_ARN) + .counter() + .count(); - sut.handleResponse(result); + assertThat(count, equalTo(3.0)); + } - assertThat(failureCountByCode("InvalidParameter")).isEqualTo(3.0); - } + @Test + void testHandleResponseIncrementsSuccessCounter() { + final PublishBatchResponse result = buildResult(3, 0); - @Test - @DisplayName("should handle mixed batch with both successes and failures") - void shouldHandleMixedBatch() { - final PublishBatchResponse result = PublishBatchResponse.builder() - .successful(Arrays.asList(successEntry("id-ok-1"), successEntry("id-ok-2"))) - .failed(Collections.singleton(failedEntry("id-bad", "KMSDisabled"))) - .build(); + decorator.handleResponse(result); - sut.handleResponse(result); + final double count = meterRegistry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_SUCCESS) + .tag("topic", TOPIC_ARN) + .counter() + .count(); - assertThat(successCount()).isEqualTo(2.0); - assertThat(failureCountByCode("KMSDisabled")).isEqualTo(1.0); - } + assertThat(count, equalTo(3.0)); + } - @Test - @DisplayName("should tag failures with error_type 'amazon_service_exception'") - void shouldTagFailuresWithAmazonErrorType() { - final PublishBatchResponse result = PublishBatchResponse.builder() - .successful(Collections.emptyList()) - .failed(Collections.singleton(failedEntry("id-x", "ThrottledException"))) - .build(); + @Test + void testHandleResponseDoesNotIncrementSuccessCounterWhenZeroSuccessful() { + final PublishBatchResponse result = buildResult(0, 2); - sut.handleResponse(result); + decorator.handleResponse(result); - final Counter failureCounter = registry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_FAILURE) - .tag("error_code", "ThrottledException") - .tag("error_type", AbstractAmazonSnsConsumerMetricsDecorator.ERROR_TYPE_AMAZON) - .counter(); + final double count = meterRegistry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_SUCCESS) + .tag("topic", TOPIC_ARN) + .counter() + .count(); - assertThat(failureCounter).isNotNull(); - assertThat(failureCounter.count()).isEqualTo(1.0); - } + assertThat(count, equalTo(0.0)); } - @Nested - @DisplayName("handleError()") - class HandleError { + @Test + void testHandleResponseIncrementsFailureCounterForEachFailedEntry() throws IllegalAccessException { + final PublishBatchResponse result = buildResult(0, 2); + FieldUtils.writeDeclaredField(result.failed().get(0), "code", "InvalidParameter", true); + FieldUtils.writeDeclaredField(result.failed().get(1), "code", "AuthorizationError", true); - @Test - @DisplayName("should delegate to the wrapped consumer") - void shouldDelegateToWrappedConsumer() { - final PublishBatchRequest request = batchRequest(1); - final RuntimeException cause = new RuntimeException("transport error"); + decorator.handleResponse(result); - sut.handleError(request, cause); + final double count1 = meterRegistry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_FAILURE) + .tag("topic", TOPIC_ARN) + .tag(AbstractAmazonSnsConsumerMetricsDecorator.TAG_ERROR_CODE, "InvalidParameter") + .tag(AbstractAmazonSnsConsumerMetricsDecorator.TAG_ERROR_TYPE, AbstractAmazonSnsConsumerMetricsDecorator.ERROR_TYPE_AMAZON) + .counter() + .count(); - verify(delegate, times(1)).handleError(request, cause); - } + final double count2 = meterRegistry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_FAILURE) + .tag("topic", TOPIC_ARN) + .tag(AbstractAmazonSnsConsumerMetricsDecorator.TAG_ERROR_CODE, "AuthorizationError") + .tag(AbstractAmazonSnsConsumerMetricsDecorator.TAG_ERROR_TYPE, AbstractAmazonSnsConsumerMetricsDecorator.ERROR_TYPE_AMAZON) + .counter() + .count(); - @Test - @DisplayName("should count all batch entries as failures on AwsServiceException") - void shouldCountAllEntriesAsFailures_onAwsServiceException() { - final PublishBatchRequest request = batchRequest(3); - final AwsServiceException cause = serviceException("InternalError"); + assertThat(count1, equalTo(1.0)); + assertThat(count2, equalTo(1.0)); + } - sut.handleError(request, cause); + @Test + void testHandleResponseDelegatesCallToDelegate() { + final PublishBatchResponse result = buildResult(1, 0); - assertThat(failureCountByCode("InternalError")).isEqualTo(3.0); - } + decorator.handleResponse(result); - @Test - @DisplayName("should tag AwsServiceException failures with correct error_type") - void shouldTagAwsServiceExceptionWithCorrectErrorType() { - final PublishBatchRequest request = batchRequest(1); - final AwsServiceException cause = serviceException("InternalError"); + verify(delegate, times(1)).handleResponse(result); + } - sut.handleError(request, cause); + @Test + void testHandleResponseWithMixedSuccessAndFailure() throws IllegalAccessException { + final PublishBatchResponse result = buildResult(2, 1); + FieldUtils.writeDeclaredField(result.failed().get(0), "code", "Throttling", true); - final Counter counter = registry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_FAILURE) - .tag("error_code", "InternalError") - .tag("error_type", AbstractAmazonSnsConsumerMetricsDecorator.ERROR_TYPE_AMAZON) - .counter(); + decorator.handleResponse(result); - assertThat(counter).isNotNull(); - assertThat(counter.count()).isEqualTo(1.0); - } + final double successCount = meterRegistry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_SUCCESS) + .tag("topic", TOPIC_ARN) + .counter() + .count(); - @Test - @DisplayName("should use error code '000' for non-AwsServiceException") - void shouldUseDefaultErrorCode_forGenericException() { - final PublishBatchRequest request = batchRequest(2); + final double failureCount = meterRegistry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_FAILURE) + .tag("topic", TOPIC_ARN) + .tag(AbstractAmazonSnsConsumerMetricsDecorator.TAG_ERROR_CODE, "Throttling") + .tag(AbstractAmazonSnsConsumerMetricsDecorator.TAG_ERROR_TYPE, AbstractAmazonSnsConsumerMetricsDecorator.ERROR_TYPE_AMAZON) + .counter() + .count(); - sut.handleError(request, new RuntimeException("network timeout")); + assertThat(successCount, equalTo(2.0)); + assertThat(failureCount, equalTo(1.0)); + } - assertThat(failureCountByCode("000")).isEqualTo(2.0); - } + @Test + void testHandleResponseAccumulatesSuccessCounterAcrossMultipleCalls() { + decorator.handleResponse(buildResult(2, 0)); + decorator.handleResponse(buildResult(3, 0)); - @Test - @DisplayName("should tag generic exceptions with error_type 'unknown'") - void shouldTagGenericExceptionsWithUnknownErrorType() { - final PublishBatchRequest request = batchRequest(1); + final double count = meterRegistry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_SUCCESS) + .tag("topic", TOPIC_ARN) + .counter() + .count(); - sut.handleError(request, new RuntimeException("network timeout")); + assertThat(count, equalTo(5.0)); + } - final Counter counter = registry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_FAILURE) - .tag("error_code", "000") - .tag("error_type", AbstractAmazonSnsConsumerMetricsDecorator.ERROR_TYPE_OTHER) - .counter(); + @Test + void testHandleErrorWithAwsServiceExceptionIncrementsFailureCounter() { + final PublishBatchRequest request = buildRequest(3); + final AwsServiceException ex = AwsServiceException.builder() + .awsErrorDetails(AwsErrorDetails.builder().errorCode("ServiceUnavailable").build()) + .message("Service error") + .build(); - assertThat(counter).isNotNull(); - assertThat(counter.count()).isEqualTo(1.0); - } + decorator.handleError(request, ex); - @Test - @DisplayName("should count each entry individually when batch has multiple entries") - void shouldCountEachEntryIndividually() { - final PublishBatchRequest request = batchRequest(5); - final AwsServiceException cause = serviceException("ServiceUnavailable"); + final double count = meterRegistry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_FAILURE) + .tag("topic", TOPIC_ARN) + .tag(AbstractAmazonSnsConsumerMetricsDecorator.TAG_ERROR_CODE, "ServiceUnavailable") + .tag(AbstractAmazonSnsConsumerMetricsDecorator.TAG_ERROR_TYPE, AbstractAmazonSnsConsumerMetricsDecorator.ERROR_TYPE_AMAZON) + .counter() + .count(); + + assertThat(count, equalTo(3.0)); + } - sut.handleError(request, cause); + @Test + void testHandleErrorWithUnknownExceptionIncrementsFailureCounterWithCode000() { + final PublishBatchRequest request = buildRequest(2); + final RuntimeException ex = new RuntimeException("unexpected"); + + decorator.handleError(request, ex); + + final double count = meterRegistry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_FAILURE) + .tag("topic", TOPIC_ARN) + .tag(AbstractAmazonSnsConsumerMetricsDecorator.TAG_ERROR_CODE, "000") + .tag(AbstractAmazonSnsConsumerMetricsDecorator.TAG_ERROR_TYPE, AbstractAmazonSnsConsumerMetricsDecorator.ERROR_TYPE_OTHER) + .counter() + .count(); - assertThat(failureCountByCode("ServiceUnavailable")).isEqualTo(5.0); - } + assertThat(count, equalTo(2.0)); } - @Nested - @DisplayName("lifecycle methods") - class Lifecycle { + @Test + void testHandleErrorWithAwsServiceExceptionUsesAmazonErrorType() { + final PublishBatchRequest request = buildRequest(1); + + final AwsServiceException ex = AwsServiceException.builder() + .awsErrorDetails(AwsErrorDetails.builder().errorCode("AccessDenied").build()) + .message("error") + .build(); + + decorator.handleError(request, ex); - @Test - @DisplayName("shutdown() should delegate to the wrapped consumer") - void shutdown_shouldDelegate() { - sut.shutdown(); - verify(delegate, times(1)).shutdown(); - } + final Counter counter = meterRegistry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_FAILURE) + .tag("topic", TOPIC_ARN) + .tag(AbstractAmazonSnsConsumerMetricsDecorator.TAG_ERROR_TYPE, AbstractAmazonSnsConsumerMetricsDecorator.ERROR_TYPE_AMAZON) + .counter(); - @Test - @DisplayName("await() should delegate to the wrapped consumer") - void await_shouldDelegate() { - sut.await(); - verify(delegate, times(1)).await(); - } + assertThat(counter, notNullValue()); + assertThat(counter.count(), equalTo(1.0)); } - @Nested - @DisplayName("null MeterRegistry") - class NullRegistry { + @Test + void testHandleErrorWithNonAmazonExceptionUsesUnknownErrorType() { + final PublishBatchRequest request = buildRequest(1); + final NullPointerException ex = new NullPointerException("npe"); - @Test - @DisplayName("should not throw when MeterRegistry is null") - void shouldNotThrowWhenRegistryIsNull() { - final AmazonSnsConsumerMetricsDecorator nullRegistrySut = new AmazonSnsConsumerMetricsDecorator(delegate, topicProperty, null); + decorator.handleError(request, ex); - when(delegate.publish(any())).thenReturn(successResult("id-1")); + final Counter counter = meterRegistry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_FAILURE) + .tag("topic", TOPIC_ARN) + .tag(AbstractAmazonSnsConsumerMetricsDecorator.TAG_ERROR_TYPE, AbstractAmazonSnsConsumerMetricsDecorator.ERROR_TYPE_OTHER) + .counter(); - // should execute without NullPointerException - nullRegistrySut.publish(batchRequest(1)); - } + assertThat(counter, notNullValue()); + assertThat(counter.count(), equalTo(1.0)); } - private double attemptsCount() { - return registry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_ATTEMPTS) + @Test + void testHandleErrorDelegatesCallToDelegate() { + final PublishBatchRequest request = buildRequest(1); + final RuntimeException ex = new RuntimeException("err"); + + decorator.handleError(request, ex); + + verify(delegate, times(1)).handleError(request, ex); + } + + @Test + void testHandleErrorCountsAllEntriesInBatch() { + final PublishBatchRequest request = buildRequest(10); + final RuntimeException ex = new RuntimeException("bulk failure"); + + decorator.handleError(request, ex); + + final double count = meterRegistry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_FAILURE) .tag("topic", TOPIC_ARN) + .tag(AbstractAmazonSnsConsumerMetricsDecorator.TAG_ERROR_CODE, "000") + .tag(AbstractAmazonSnsConsumerMetricsDecorator.TAG_ERROR_TYPE, AbstractAmazonSnsConsumerMetricsDecorator.ERROR_TYPE_OTHER) .counter() .count(); + + assertThat(count, equalTo(10.0)); } - private double successCount() { - final Counter c = registry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_SUCCESS) - .tag("topic", TOPIC_ARN) - .counter(); + @Test + void testShutdownDelegatesCallToDelegate() { + decorator.shutdown(); + verify(delegate, times(1)).shutdown(); + } - return c == null ? 0.0 : c.count(); + @Test + void testAwaitDelegatesCallToDelegate() { + when(delegate.await()).thenReturn(CompletableFuture.completedFuture(null)); + + decorator.await(); + + verify(delegate, times(1)).await(); } - private double failureCountByCode(final String code) { - final Counter c = registry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_FAILURE) - .tag("topic", TOPIC_ARN) - .tag("error_code", code) - .counter(); + @Test + void testAwaitReturnsTheDelegatesFuture() { + final CompletableFuture expected = CompletableFuture.completedFuture(null); + when(delegate.await()).thenReturn(expected); + + final CompletableFuture result = decorator.await(); - return c == null ? 0.0 : c.count(); + assertThat(result, equalTo(expected)); } - private long batchSizeCount() { - return registry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_BATCH_SIZE) + @Test + void testFullSuccessfulPublishFlowUpdatesAllMetrics() { + final PublishBatchRequest request = buildRequest(4); + final PublishBatchResponse batchResult = buildResult(4, 0); + when(delegate.publish(request)).thenReturn(batchResult); + + final PublishBatchResponse result = decorator.publish(request); + decorator.handleResponse(result); + + final double attempts = meterRegistry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_ATTEMPTS) .tag("topic", TOPIC_ARN) - .summary() + .counter() + .count(); + + final double success = meterRegistry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_SUCCESS) + .tag("topic", TOPIC_ARN) + .counter() .count(); - } - private double batchSizeMean() { - return registry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_BATCH_SIZE) + final double batchMean = meterRegistry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_BATCH_SIZE) .tag("topic", TOPIC_ARN) .summary() .mean(); - } - private long timerCount() { - return registry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_DURATION) + final long timerCount = meterRegistry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_DURATION) .tag("topic", TOPIC_ARN) .timer() .count(); + + assertThat(attempts, equalTo(1.0)); + assertThat(success, equalTo(4.0)); + assertThat(batchMean, equalTo(4.0)); + assertThat(timerCount, equalTo(1L)); } - private double inflightValue() { - return registry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_INFLIGHT) + @Test + void testFullErrorFlowUpdatesFailureMetrics() { + final PublishBatchRequest request = buildRequest(3); + final AwsServiceException ex = AwsServiceException.builder() + .awsErrorDetails(AwsErrorDetails.builder().errorCode("RequestExpired").build()) + .message("timeout") + .build(); + + when(delegate.publish(request)).thenThrow(ex); + + assertThrows(RuntimeException.class, () -> decorator.publish(request)); + decorator.handleError(request, ex); + + final double attempts = meterRegistry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_ATTEMPTS) .tag("topic", TOPIC_ARN) - .gauge() - .value(); + .counter() + .count(); + + final double failures = meterRegistry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_FAILURE) + .tag("topic", TOPIC_ARN) + .tag(AbstractAmazonSnsConsumerMetricsDecorator.TAG_ERROR_CODE, "RequestExpired") + .tag(AbstractAmazonSnsConsumerMetricsDecorator.TAG_ERROR_TYPE, AbstractAmazonSnsConsumerMetricsDecorator.ERROR_TYPE_AMAZON) + .counter().count(); + + assertThat(attempts, equalTo(1.0)); + assertThat(failures, equalTo(3.0)); } - private static PublishBatchRequest batchRequest(final int count) { - final List entries = new LinkedList<>(); + private PublishBatchRequest buildRequest(final int size) { + final List entries = new ArrayList<>(); - IntStream.range(0, count).forEach(i -> { + IntStream.range(0, size).forEach(i -> { entries.add(PublishBatchRequestEntry.builder().id("id-" + i).message("msg-" + i).build()); }); - return PublishBatchRequest.builder().topicArn(TOPIC_ARN).publishBatchRequestEntries(entries).build(); + return PublishBatchRequest.builder().publishBatchRequestEntries(entries).build(); } - private static PublishBatchResponse successResult(final String... ids) { - - final List successEntries = new LinkedList<>(); + private PublishBatchResponse buildResult(final int successCount, final int failureCount) { + final List successful = new ArrayList<>(); - for (final String id : ids) { - successEntries.add(successEntry(id)); - } - - return PublishBatchResponse.builder().successful(successEntries).failed(Collections.emptyList()).build(); - } + IntStream.range(0, successCount).forEach(i -> { + successful.add(PublishBatchResultEntry.builder().id("id-" + i).messageId("msg-id-" + i).build()); + }); - private static PublishBatchResultEntry successEntry(final String id) { - return PublishBatchResultEntry.builder().id(id).messageId("msg-" + id).build(); - } + final List failed = new java.util.ArrayList<>(); - private static BatchResultErrorEntry failedEntry(final String id, final String code) { - return BatchResultErrorEntry.builder().id(id).code(code).message("error detail").senderFault(true).build(); - } + IntStream.range(0, failureCount).forEach(i -> { + failed.add(BatchResultErrorEntry.builder().id("id-fail-" + i).code("ErrorCode").message("error").build()); + }); - private static AwsServiceException serviceException(final String code) { - return AwsServiceException.builder() - .message("Service error") - .awsErrorDetails(AwsErrorDetails.builder() - .errorCode(code) - .build() - ).build(); + return PublishBatchResponse.builder().successful(successful).failed(failed).build(); } -} -// @formatter:on \ No newline at end of file +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index e23ce3f..0ccf534 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.github.mvallim amazon-sns-java-messaging-lib - 1.2.0-SNAPSHOT + 1.3.0-SNAPSHOT pom ${project.artifactId}