Skip to content

Commit 05c3eee

Browse files
authored
feat: Allow fine tuning sqs pooling (#737)
* feat: Allow fine tuning sqs pooling * chore: PR suggestions * chore: PR suggestions and unit tests
1 parent 58de92a commit 05c3eee

9 files changed

Lines changed: 926 additions & 235 deletions

File tree

docs/configuration/index.mdx

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,22 @@ This table lists the environment variables and their default values.
6969
| `AWS_ACCOUNT_ID` | (none) | `<aws account id>` | Required when `QUEUE_BACKEND=sqs` and `SQS_QUEUE_URL_PREFIX` is not set. Used to construct default SQS queue URLs. |
7070
| `SQS_QUEUE_URL_PREFIX` | (none) | `<sqs queue url prefix>` | Optional override for SQS queue URL prefix (example: `https://sqs.us-east-1.amazonaws.com/123456789012/relayer-`). When set, `AWS_ACCOUNT_ID` is not required. |
7171
| `SQS_QUEUE_TYPE` | `auto` | `auto, standard, fifo` | SQS queue type. `auto` (default) probes the `transaction-request` queue at startup to detect the type. `standard` and `fifo` skip probing and use the specified type directly. |
72+
| `SQS_TRANSACTION_REQUEST_WAIT_TIME_SECONDS` | `15` | `0-20` | SQS long-poll `WaitTimeSeconds` for the transaction request queue. Lower values reduce pickup latency on bursty queues at the cost of more SQS API calls during idle periods. Only applies when `QUEUE_BACKEND=sqs`. |
73+
| `SQS_TRANSACTION_SUBMISSION_WAIT_TIME_SECONDS` | `15` | `0-20` | SQS long-poll `WaitTimeSeconds` for the transaction submission queue. Same trade-off as the request queue setting above. |
74+
| `SQS_STATUS_CHECK_WAIT_TIME_SECONDS` | `5` | `0-20` | SQS long-poll `WaitTimeSeconds` for the generic status check queue. |
75+
| `SQS_STATUS_CHECK_EVM_WAIT_TIME_SECONDS` | `5` | `0-20` | SQS long-poll `WaitTimeSeconds` for the EVM status check queue. |
76+
| `SQS_STATUS_CHECK_STELLAR_WAIT_TIME_SECONDS` | `3` | `0-20` | SQS long-poll `WaitTimeSeconds` for the Stellar status check queue. |
77+
| `SQS_NOTIFICATION_WAIT_TIME_SECONDS` | `20` | `0-20` | SQS long-poll `WaitTimeSeconds` for the notification queue. |
78+
| `SQS_TOKEN_SWAP_REQUEST_WAIT_TIME_SECONDS` | `20` | `0-20` | SQS long-poll `WaitTimeSeconds` for the token swap request queue. |
79+
| `SQS_RELAYER_HEALTH_CHECK_WAIT_TIME_SECONDS` | `20` | `0-20` | SQS long-poll `WaitTimeSeconds` for the relayer health check queue. |
80+
| `SQS_TRANSACTION_REQUEST_POLLER_COUNT` | `1` | `<any positive number>` | Number of concurrent SQS `ReceiveMessage` loops for the transaction request queue per task. More pollers improve message pickup smoothness on bursty queues. All pollers share the same handler concurrency semaphore. |
81+
| `SQS_TRANSACTION_SUBMISSION_POLLER_COUNT` | `1` | `<any positive number>` | Number of concurrent SQS `ReceiveMessage` loops for the transaction submission queue per task. |
82+
| `SQS_STATUS_CHECK_POLLER_COUNT` | `1` | `<any positive number>` | Number of concurrent SQS `ReceiveMessage` loops for the generic status check queue per task. |
83+
| `SQS_STATUS_CHECK_EVM_POLLER_COUNT` | `1` | `<any positive number>` | Number of concurrent SQS `ReceiveMessage` loops for the EVM status check queue per task. |
84+
| `SQS_STATUS_CHECK_STELLAR_POLLER_COUNT` | `1` | `<any positive number>` | Number of concurrent SQS `ReceiveMessage` loops for the Stellar status check queue per task. |
85+
| `SQS_NOTIFICATION_POLLER_COUNT` | `1` | `<any positive number>` | Number of concurrent SQS `ReceiveMessage` loops for the notification queue per task. |
86+
| `SQS_TOKEN_SWAP_REQUEST_POLLER_COUNT` | `1` | `<any positive number>` | Number of concurrent SQS `ReceiveMessage` loops for the token swap request queue per task. |
87+
| `SQS_RELAYER_HEALTH_CHECK_POLLER_COUNT` | `1` | `<any positive number>` | Number of concurrent SQS `ReceiveMessage` loops for the relayer health check queue per task. |
7288
| `DISTRIBUTED_MODE` | `false` | `bool` (`true/false`, `1/0`) | Enables Redis-based distributed locks for cron/cleanup tasks in multi-instance deployments, preventing duplicate scheduled execution across nodes. |
7389
| `STORAGE_ENCRYPTION_KEY` | `` | `string` | Encryption key used to encrypt data at rest in Redis storage. See [Storage Configuration](./configuration/storage) for security details. |
7490
| `RPC_TIMEOUT_MS` | `10000` | `<timeout in milliseconds>` | Sets the maximum time to wait for RPC connections before timing out. |
@@ -246,6 +262,40 @@ aws sqs set-queue-attributes \
246262
--attributes '{"RedrivePolicy":"{\"deadLetterTargetArn\":\"<DLQ_ARN>\",\"maxReceiveCount\":\"6\"}"}'
247263
```
248264

265+
### SQS performance tuning
266+
267+
When running with `QUEUE_BACKEND=sqs`, two configuration dimensions affect message pickup latency:
268+
269+
**Wait time** (`SQS_*_WAIT_TIME_SECONDS`) controls the SQS `WaitTimeSeconds` parameter — how long each `ReceiveMessage` call blocks when the queue is empty. Lower values reduce worst-case pickup delay at the cost of more API calls during idle periods.
270+
271+
**Poller count** (`SQS_*_POLLER_COUNT`) controls how many concurrent `ReceiveMessage` loops run per queue per task. Each task runs one poll loop per queue by default. With multiple relayer tasks (instances), you get one poller per task — but during traffic bursts, messages can still sit visible briefly between poll cycles. Increasing the poller count adds more concurrent receivers sharing the same handler concurrency semaphore, improving pickup smoothness without increasing processing concurrency.
272+
273+
For high-throughput deployments (>50k transactions/hour), consider:
274+
275+
```bash
276+
# Reduce long-poll wait from default 15s to 2s for the hot-path queues
277+
SQS_TRANSACTION_REQUEST_WAIT_TIME_SECONDS=2
278+
SQS_TRANSACTION_SUBMISSION_WAIT_TIME_SECONDS=2
279+
280+
# Run 3 poll loops per queue per task for smoother pickup
281+
SQS_TRANSACTION_REQUEST_POLLER_COUNT=3
282+
SQS_TRANSACTION_SUBMISSION_POLLER_COUNT=3
283+
```
284+
285+
**Monitoring pickup latency**: The `transaction_processing_seconds` histogram exposes segment-level stages to help identify bottlenecks:
286+
287+
| Stage label | What it measures |
288+
| --- | --- |
289+
| `request_queue_dwell` | Time from transaction creation to request handler start (queue wait) |
290+
| `prepare_duration` | Time spent preparing the transaction (simulation, signing, fee estimation) |
291+
| `submission_queue_dwell` | Time from submission job enqueue to submission handler start (queue wait) |
292+
| `submit_duration` | Time spent submitting the transaction to the network (RPC call) |
293+
| `creation_to_submission` | End-to-end time from creation to network submission |
294+
| `submission_to_confirmation` | Time from network submission to on-chain confirmation |
295+
| `creation_to_confirmation` | Total lifecycle from creation to confirmation |
296+
297+
Use the dwell-time stages to determine whether tail latency comes from queue pickup delays (tune wait time/poller count) or from handler processing (tune RPC provider or concurrency).
298+
249299
## Main configuration file (config.json)
250300

251301
This file can exist in any directory, but the default location is `./config/config.json`.

src/config/server_config.rs

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -673,6 +673,38 @@ impl ServerConfig {
673673
.and_then(|v| v.parse().ok())
674674
.unwrap_or(default)
675675
}
676+
677+
/// Get SQS wait time from environment variable or use default.
678+
///
679+
/// Environment variable format: `SQS_{QUEUE_KEY}_WAIT_TIME_SECONDS`
680+
/// Example: `SQS_TRANSACTION_REQUEST_WAIT_TIME_SECONDS=2`
681+
///
682+
/// Values are clamped to the SQS maximum of 20 seconds.
683+
pub fn get_sqs_wait_time(queue_key: &str, default: u64) -> u64 {
684+
let env_var = format!("SQS_{queue_key}_WAIT_TIME_SECONDS");
685+
env::var(&env_var)
686+
.ok()
687+
.and_then(|v| v.parse().ok())
688+
.unwrap_or(default)
689+
.min(20)
690+
}
691+
692+
/// Get SQS poller count from environment variable or use default.
693+
///
694+
/// Environment variable format: `SQS_{QUEUE_KEY}_POLLER_COUNT`
695+
/// Example: `SQS_TRANSACTION_REQUEST_POLLER_COUNT=4`
696+
///
697+
/// Controls how many concurrent SQS `ReceiveMessage` loops run per queue
698+
/// per task. More pollers improve pickup smoothness on bursty queues.
699+
/// All pollers share the same concurrency semaphore.
700+
pub fn get_sqs_poller_count(queue_key: &str, default: usize) -> usize {
701+
let env_var = format!("SQS_{queue_key}_POLLER_COUNT");
702+
env::var(&env_var)
703+
.ok()
704+
.and_then(|v| v.parse().ok())
705+
.unwrap_or(default)
706+
.max(1)
707+
}
676708
}
677709

678710
#[cfg(test)]
@@ -2031,4 +2063,104 @@ mod tests {
20312063
env::remove_var("API_KEY");
20322064
}
20332065
}
2066+
2067+
mod get_sqs_wait_time_tests {
2068+
use super::*;
2069+
use serial_test::serial;
2070+
2071+
#[test]
2072+
#[serial]
2073+
fn test_returns_default_when_env_not_set() {
2074+
env::remove_var("SQS_TEST_QUEUE_WAIT_TIME_SECONDS");
2075+
let result = ServerConfig::get_sqs_wait_time("TEST_QUEUE", 5);
2076+
assert_eq!(result, 5, "Should return default when env var is not set");
2077+
}
2078+
2079+
#[test]
2080+
#[serial]
2081+
fn test_returns_parsed_value() {
2082+
env::set_var("SQS_TEST_QUEUE_WAIT_TIME_SECONDS", "10");
2083+
let result = ServerConfig::get_sqs_wait_time("TEST_QUEUE", 5);
2084+
assert_eq!(result, 10, "Should return parsed value");
2085+
env::remove_var("SQS_TEST_QUEUE_WAIT_TIME_SECONDS");
2086+
}
2087+
2088+
#[test]
2089+
#[serial]
2090+
fn test_returns_default_when_invalid() {
2091+
env::set_var("SQS_TEST_QUEUE_WAIT_TIME_SECONDS", "not_a_number");
2092+
let result = ServerConfig::get_sqs_wait_time("TEST_QUEUE", 5);
2093+
assert_eq!(result, 5, "Should return default for non-numeric input");
2094+
env::remove_var("SQS_TEST_QUEUE_WAIT_TIME_SECONDS");
2095+
}
2096+
2097+
#[test]
2098+
#[serial]
2099+
fn test_clamps_to_sqs_maximum_of_20() {
2100+
env::set_var("SQS_TEST_QUEUE_WAIT_TIME_SECONDS", "30");
2101+
let result = ServerConfig::get_sqs_wait_time("TEST_QUEUE", 5);
2102+
assert_eq!(result, 20, "Should clamp to SQS maximum of 20 seconds");
2103+
env::remove_var("SQS_TEST_QUEUE_WAIT_TIME_SECONDS");
2104+
}
2105+
2106+
#[test]
2107+
#[serial]
2108+
fn test_allows_zero() {
2109+
env::set_var("SQS_TEST_QUEUE_WAIT_TIME_SECONDS", "0");
2110+
let result = ServerConfig::get_sqs_wait_time("TEST_QUEUE", 5);
2111+
assert_eq!(result, 0, "Should allow zero (short polling)");
2112+
env::remove_var("SQS_TEST_QUEUE_WAIT_TIME_SECONDS");
2113+
}
2114+
}
2115+
2116+
mod get_sqs_poller_count_tests {
2117+
use super::*;
2118+
use serial_test::serial;
2119+
2120+
#[test]
2121+
#[serial]
2122+
fn test_returns_default_when_env_not_set() {
2123+
env::remove_var("SQS_TEST_QUEUE_POLLER_COUNT");
2124+
let result = ServerConfig::get_sqs_poller_count("TEST_QUEUE", 2);
2125+
assert_eq!(result, 2, "Should return default when env var is not set");
2126+
}
2127+
2128+
#[test]
2129+
#[serial]
2130+
fn test_returns_parsed_value() {
2131+
env::set_var("SQS_TEST_QUEUE_POLLER_COUNT", "4");
2132+
let result = ServerConfig::get_sqs_poller_count("TEST_QUEUE", 2);
2133+
assert_eq!(result, 4, "Should return parsed value");
2134+
env::remove_var("SQS_TEST_QUEUE_POLLER_COUNT");
2135+
}
2136+
2137+
#[test]
2138+
#[serial]
2139+
fn test_returns_default_when_invalid() {
2140+
env::set_var("SQS_TEST_QUEUE_POLLER_COUNT", "not_a_number");
2141+
let result = ServerConfig::get_sqs_poller_count("TEST_QUEUE", 2);
2142+
assert_eq!(result, 2, "Should return default for non-numeric input");
2143+
env::remove_var("SQS_TEST_QUEUE_POLLER_COUNT");
2144+
}
2145+
2146+
#[test]
2147+
#[serial]
2148+
fn test_clamps_zero_to_minimum_of_1() {
2149+
env::set_var("SQS_TEST_QUEUE_POLLER_COUNT", "0");
2150+
let result = ServerConfig::get_sqs_poller_count("TEST_QUEUE", 2);
2151+
assert_eq!(result, 1, "Should clamp zero to minimum of 1");
2152+
env::remove_var("SQS_TEST_QUEUE_POLLER_COUNT");
2153+
}
2154+
2155+
#[test]
2156+
#[serial]
2157+
fn test_default_also_clamped_to_minimum_of_1() {
2158+
env::remove_var("SQS_TEST_QUEUE_POLLER_COUNT");
2159+
let result = ServerConfig::get_sqs_poller_count("TEST_QUEUE", 0);
2160+
assert_eq!(
2161+
result, 1,
2162+
"Default of 0 should also be clamped to minimum of 1"
2163+
);
2164+
}
2165+
}
20342166
}

0 commit comments

Comments
 (0)