Skip to content

Commit 813a073

Browse files
Merge pull request #15 from rohas-dev/feat/adapter-sqs
feat(adapter): introduce AWS adapter support for SQS and EventBridge, refactor existing SQS adapter, and update configuration handling
2 parents dfe2bf5 + f686e5b commit 813a073

35 files changed

Lines changed: 2255 additions & 115 deletions

File tree

Cargo.lock

Lines changed: 344 additions & 29 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ members = [
1313
"crates/rohas-adapters/adapter-nats",
1414
"crates/rohas-adapters/adapter-kafka",
1515
"crates/rohas-adapters/adapter-rabbitmq",
16-
"crates/rohas-adapters/adapter-sqs",
16+
"crates/rohas-adapters/adapter-aws",
1717
"crates/rohas-adapters/adapter-rocksdb",
1818
]
1919

@@ -70,6 +70,7 @@ async-nats = "0.45.0"
7070
rdkafka = { version = "0.38.0", features = ["cmake-build"] }
7171
lapin = "3.7.2"
7272
aws-sdk-sqs = "1.9"
73+
aws-sdk-eventbridge = "1.9"
7374

7475
# Watching / Hot reload
7576
notify = "8.2.0"
@@ -95,7 +96,7 @@ adapter-memory = { path = "crates/rohas-adapters/adapter-memory" }
9596
adapter-nats = { path = "crates/rohas-adapters/adapter-nats" }
9697
adapter-kafka = { path = "crates/rohas-adapters/adapter-kafka" }
9798
adapter-rabbitmq = { path = "crates/rohas-adapters/adapter-rabbitmq" }
98-
adapter-sqs = { path = "crates/rohas-adapters/adapter-sqs" }
99+
adapter-aws = { path = "crates/rohas-adapters/adapter-aws" }
99100
adapter-rocksdb = { path = "crates/rohas-adapters/adapter-rocksdb" }
100101
rohas-telemetry = { path = "crates/rohas-telemetry" }
101102

crates/rohas-adapters/adapter-sqs/Cargo.toml renamed to crates/rohas-adapters/adapter-aws/Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
[package]
2-
name = "adapter-sqs"
2+
name = "adapter-aws"
33
version = { workspace = true }
44
edition = { workspace = true }
55
authors = { workspace = true }
@@ -8,6 +8,8 @@ license = { workspace = true }
88
[dependencies]
99
tokio = { workspace = true }
1010
aws-sdk-sqs = { workspace = true }
11+
aws-sdk-eventbridge = "1.9"
12+
aws-config = "1.1"
1113
serde = { workspace = true }
1214
serde_json = { workspace = true }
1315
thiserror = { workspace = true }
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
use async_trait::async_trait;
2+
use serde::{Deserialize, Serialize};
3+
use std::collections::HashMap;
4+
use thiserror::Error;
5+
6+
pub type Result<T> = std::result::Result<T, AdapterError>;
7+
8+
#[derive(Error, Debug)]
9+
pub enum AdapterError {
10+
#[error("AWS SQS error: {0}")]
11+
AwsSqs(String),
12+
13+
#[error("AWS EventBridge error: {0}")]
14+
AwsEventBridge(String),
15+
16+
#[error("Queue not found: {0}")]
17+
QueueNotFound(String),
18+
19+
#[error("Serialization error: {0}")]
20+
Serialization(#[from] serde_json::Error),
21+
22+
#[error("Invalid message format: {0}")]
23+
InvalidMessage(String),
24+
25+
#[error("Configuration error: {0}")]
26+
Configuration(String),
27+
}
28+
29+
#[derive(Debug, Clone, Serialize, Deserialize)]
30+
pub struct Message {
31+
pub topic: String,
32+
pub payload: serde_json::Value,
33+
pub timestamp: String,
34+
pub metadata: HashMap<String, String>,
35+
}
36+
37+
impl Message {
38+
pub fn new(topic: impl Into<String>, payload: serde_json::Value) -> Self {
39+
use std::time::SystemTime;
40+
Self {
41+
topic: topic.into(),
42+
payload,
43+
timestamp: SystemTime::now()
44+
.duration_since(SystemTime::UNIX_EPOCH)
45+
.unwrap()
46+
.as_secs()
47+
.to_string(),
48+
metadata: HashMap::new(),
49+
}
50+
}
51+
52+
pub fn with_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
53+
self.metadata.insert(key.into(), value.into());
54+
self
55+
}
56+
}
57+
58+
#[async_trait]
59+
pub trait MessageHandler: Send + Sync {
60+
async fn handle(&self, message: Message) -> Result<()>;
61+
}
62+
63+
#[derive(Debug, Clone)]
64+
pub struct AwsConfig {
65+
pub region: String,
66+
pub queue_prefix: Option<String>, // For SQS
67+
pub event_bus_name: Option<String>, // For EventBridge (default: "default")
68+
pub source: Option<String>, // For EventBridge (default: "rohas")
69+
pub visibility_timeout_seconds: Option<i32>, // For SQS
70+
pub message_retention_seconds: Option<i32>, // For SQS
71+
pub receive_wait_time_seconds: Option<i32>, // For SQS (long polling)
72+
}
73+
74+
impl Default for AwsConfig {
75+
fn default() -> Self {
76+
Self {
77+
region: "us-east-1".to_string(),
78+
queue_prefix: Some("rohas-".to_string()),
79+
event_bus_name: None, // Use default event bus
80+
source: Some("rohas".to_string()),
81+
visibility_timeout_seconds: Some(30),
82+
message_retention_seconds: Some(345600), // 4 days
83+
receive_wait_time_seconds: Some(20), // Long polling
84+
}
85+
}
86+
}
87+

0 commit comments

Comments
 (0)