Skip to content

Commit 8f33467

Browse files
committed
feat(trogon-source-telegram): add Telegram webhook source adapter
- One-way webhook→NATS relay following the source-* crate pattern - Secret token validation via constant-time comparison - Update type extraction from JSON body for subject routing - JetStream stream provisioning, dedup via update_id, body size limits Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
1 parent 7501c17 commit 8f33467

9 files changed

Lines changed: 1137 additions & 0 deletions

File tree

rsworkspace/Cargo.lock

Lines changed: 21 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rsworkspace/crates/acp-telemetry/src/service_name.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ pub enum ServiceName {
1111
TrogonSourceGitlab,
1212
TrogonSourceLinear,
1313
TrogonSourceSlack,
14+
TrogonSourceTelegram,
1415
}
1516

1617
impl ServiceName {
@@ -24,6 +25,7 @@ impl ServiceName {
2425
Self::TrogonSourceGitlab => "trogon-source-gitlab",
2526
Self::TrogonSourceLinear => "trogon-source-linear",
2627
Self::TrogonSourceSlack => "trogon-source-slack",
28+
Self::TrogonSourceTelegram => "trogon-source-telegram",
2729
}
2830
}
2931
}
@@ -60,6 +62,10 @@ mod tests {
6062
ServiceName::TrogonSourceSlack.as_str(),
6163
"trogon-source-slack"
6264
);
65+
assert_eq!(
66+
ServiceName::TrogonSourceTelegram.as_str(),
67+
"trogon-source-telegram"
68+
);
6369
}
6470

6571
#[test]
@@ -84,5 +90,9 @@ mod tests {
8490
format!("{}", ServiceName::TrogonSourceSlack),
8591
"trogon-source-slack"
8692
);
93+
assert_eq!(
94+
format!("{}", ServiceName::TrogonSourceTelegram),
95+
"trogon-source-telegram"
96+
);
8797
}
8898
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
[package]
2+
name = "trogon-source-telegram"
3+
version = "0.1.0"
4+
edition = "2024"
5+
6+
[lints]
7+
workspace = true
8+
9+
[dependencies]
10+
acp-telemetry = { workspace = true }
11+
async-nats = { workspace = true, features = ["jetstream"] }
12+
axum = { workspace = true }
13+
bytes = { workspace = true }
14+
bytesize = "2.3.1"
15+
serde = { workspace = true }
16+
serde_json = { workspace = true }
17+
subtle = "2.6"
18+
tokio = { workspace = true, features = ["full"] }
19+
tower-http = { workspace = true, features = ["limit"] }
20+
tracing = { workspace = true }
21+
trogon-nats = { workspace = true }
22+
trogon-std = { workspace = true }
23+
24+
[dev-dependencies]
25+
trogon-nats = { workspace = true, features = ["test-support"] }
26+
trogon-std = { workspace = true, features = ["test-support"] }
27+
tower = "0.5"
28+
tracing-subscriber = { workspace = true }
Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
use std::time::Duration;
2+
3+
use bytesize::ByteSize;
4+
use trogon_nats::NatsConfig;
5+
use trogon_std::env::ReadEnv;
6+
7+
use crate::constants::{
8+
DEFAULT_MAX_BODY_SIZE, DEFAULT_NATS_ACK_TIMEOUT, DEFAULT_PORT, DEFAULT_STREAM_MAX_AGE,
9+
DEFAULT_STREAM_NAME, DEFAULT_SUBJECT_PREFIX,
10+
};
11+
12+
/// Configuration for the Telegram webhook source.
13+
///
14+
/// Resolved from environment variables:
15+
/// - `TELEGRAM_WEBHOOK_SECRET`: secret token configured via `setWebhook` (**required**)
16+
/// - `TELEGRAM_SOURCE_PORT`: HTTP listening port (default: 8080)
17+
/// - `TELEGRAM_SUBJECT_PREFIX`: NATS subject prefix (default: `telegram`)
18+
/// - `TELEGRAM_STREAM_NAME`: JetStream stream name (default: `TELEGRAM`)
19+
/// - `TELEGRAM_STREAM_MAX_AGE_SECS`: max age in seconds (default: 604800 / 7 days)
20+
/// - `TELEGRAM_NATS_ACK_TIMEOUT_SECS`: NATS ack timeout in seconds (default: 10)
21+
/// - `TELEGRAM_MAX_BODY_SIZE`: maximum body size in bytes (default: 10 MB)
22+
/// - Standard `NATS_*` variables for NATS connection (see `trogon-nats`)
23+
pub struct TelegramSourceConfig {
24+
pub webhook_secret: String,
25+
pub port: u16,
26+
pub subject_prefix: String,
27+
pub stream_name: String,
28+
pub stream_max_age: Duration,
29+
pub nats_ack_timeout: Duration,
30+
pub max_body_size: ByteSize,
31+
pub nats: NatsConfig,
32+
}
33+
34+
impl TelegramSourceConfig {
35+
pub fn from_env<E: ReadEnv>(env: &E) -> Self {
36+
Self {
37+
webhook_secret: env
38+
.var("TELEGRAM_WEBHOOK_SECRET")
39+
.ok()
40+
.filter(|s| !s.is_empty())
41+
.expect("TELEGRAM_WEBHOOK_SECRET is required"),
42+
port: env
43+
.var("TELEGRAM_SOURCE_PORT")
44+
.ok()
45+
.and_then(|p| p.parse().ok())
46+
.unwrap_or(DEFAULT_PORT),
47+
subject_prefix: env
48+
.var("TELEGRAM_SUBJECT_PREFIX")
49+
.unwrap_or_else(|_| DEFAULT_SUBJECT_PREFIX.to_string()),
50+
stream_name: env
51+
.var("TELEGRAM_STREAM_NAME")
52+
.unwrap_or_else(|_| DEFAULT_STREAM_NAME.to_string()),
53+
stream_max_age: env
54+
.var("TELEGRAM_STREAM_MAX_AGE_SECS")
55+
.ok()
56+
.and_then(|v| v.parse().ok())
57+
.map(Duration::from_secs)
58+
.unwrap_or(DEFAULT_STREAM_MAX_AGE),
59+
nats_ack_timeout: env
60+
.var("TELEGRAM_NATS_ACK_TIMEOUT_SECS")
61+
.ok()
62+
.and_then(|v| v.parse().ok())
63+
.map(Duration::from_secs)
64+
.unwrap_or(DEFAULT_NATS_ACK_TIMEOUT),
65+
max_body_size: env
66+
.var("TELEGRAM_MAX_BODY_SIZE")
67+
.ok()
68+
.and_then(|v| v.parse::<u64>().ok())
69+
.map(ByteSize)
70+
.unwrap_or(DEFAULT_MAX_BODY_SIZE),
71+
nats: NatsConfig::from_env(env),
72+
}
73+
}
74+
}
75+
76+
#[cfg(test)]
77+
mod tests {
78+
use super::*;
79+
use trogon_std::env::InMemoryEnv;
80+
81+
fn env_with_secret() -> InMemoryEnv {
82+
let env = InMemoryEnv::new();
83+
env.set("TELEGRAM_WEBHOOK_SECRET", "test-secret");
84+
env
85+
}
86+
87+
#[test]
88+
fn defaults_with_required_secret() {
89+
let env = env_with_secret();
90+
let config = TelegramSourceConfig::from_env(&env);
91+
92+
assert_eq!(config.webhook_secret, "test-secret");
93+
assert_eq!(config.port, 8080);
94+
assert_eq!(config.subject_prefix, "telegram");
95+
assert_eq!(config.stream_name, "TELEGRAM");
96+
assert_eq!(
97+
config.stream_max_age,
98+
Duration::from_secs(7 * 24 * 60 * 60)
99+
);
100+
assert_eq!(config.nats_ack_timeout, Duration::from_secs(10));
101+
assert_eq!(config.max_body_size, ByteSize::mib(10));
102+
}
103+
104+
#[test]
105+
fn reads_all_env_vars() {
106+
let env = InMemoryEnv::new();
107+
env.set("TELEGRAM_WEBHOOK_SECRET", "my-secret");
108+
env.set("TELEGRAM_SOURCE_PORT", "9090");
109+
env.set("TELEGRAM_SUBJECT_PREFIX", "tg");
110+
env.set("TELEGRAM_STREAM_NAME", "TG_EVENTS");
111+
env.set("TELEGRAM_STREAM_MAX_AGE_SECS", "3600");
112+
env.set("TELEGRAM_NATS_ACK_TIMEOUT_SECS", "30");
113+
env.set("TELEGRAM_MAX_BODY_SIZE", "1048576");
114+
115+
let config = TelegramSourceConfig::from_env(&env);
116+
117+
assert_eq!(config.webhook_secret, "my-secret");
118+
assert_eq!(config.port, 9090);
119+
assert_eq!(config.subject_prefix, "tg");
120+
assert_eq!(config.stream_name, "TG_EVENTS");
121+
assert_eq!(config.stream_max_age, Duration::from_secs(3600));
122+
assert_eq!(config.nats_ack_timeout, Duration::from_secs(30));
123+
assert_eq!(config.max_body_size, ByteSize::mib(1));
124+
}
125+
126+
#[test]
127+
#[should_panic(expected = "TELEGRAM_WEBHOOK_SECRET is required")]
128+
fn missing_webhook_secret_panics() {
129+
let env = InMemoryEnv::new();
130+
TelegramSourceConfig::from_env(&env);
131+
}
132+
133+
#[test]
134+
#[should_panic(expected = "TELEGRAM_WEBHOOK_SECRET is required")]
135+
fn empty_webhook_secret_panics() {
136+
let env = InMemoryEnv::new();
137+
env.set("TELEGRAM_WEBHOOK_SECRET", "");
138+
TelegramSourceConfig::from_env(&env);
139+
}
140+
141+
#[test]
142+
fn invalid_port_falls_back_to_default() {
143+
let env = env_with_secret();
144+
env.set("TELEGRAM_SOURCE_PORT", "not-a-number");
145+
let config = TelegramSourceConfig::from_env(&env);
146+
assert_eq!(config.port, 8080);
147+
}
148+
149+
#[test]
150+
fn invalid_max_age_falls_back_to_default() {
151+
let env = env_with_secret();
152+
env.set("TELEGRAM_STREAM_MAX_AGE_SECS", "not-a-number");
153+
let config = TelegramSourceConfig::from_env(&env);
154+
assert_eq!(config.stream_max_age, DEFAULT_STREAM_MAX_AGE);
155+
}
156+
157+
#[test]
158+
fn invalid_nats_ack_timeout_falls_back_to_default() {
159+
let env = env_with_secret();
160+
env.set("TELEGRAM_NATS_ACK_TIMEOUT_SECS", "not-a-number");
161+
let config = TelegramSourceConfig::from_env(&env);
162+
assert_eq!(config.nats_ack_timeout, DEFAULT_NATS_ACK_TIMEOUT);
163+
}
164+
165+
#[test]
166+
fn invalid_max_body_size_falls_back_to_default() {
167+
let env = env_with_secret();
168+
env.set("TELEGRAM_MAX_BODY_SIZE", "not-a-number");
169+
let config = TelegramSourceConfig::from_env(&env);
170+
assert_eq!(config.max_body_size, DEFAULT_MAX_BODY_SIZE);
171+
}
172+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
use std::time::Duration;
2+
3+
use bytesize::ByteSize;
4+
5+
pub const DEFAULT_PORT: u16 = 8080;
6+
pub const DEFAULT_SUBJECT_PREFIX: &str = "telegram";
7+
pub const DEFAULT_STREAM_NAME: &str = "TELEGRAM";
8+
pub const DEFAULT_STREAM_MAX_AGE: Duration = Duration::from_secs(7 * 24 * 60 * 60); // 7 days
9+
pub const DEFAULT_NATS_ACK_TIMEOUT: Duration = Duration::from_secs(10);
10+
pub const DEFAULT_MAX_BODY_SIZE: ByteSize = ByteSize::mib(10);
11+
pub const DEFAULT_NATS_CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
12+
13+
pub const HEADER_SECRET_TOKEN: &str = "x-telegram-bot-api-secret-token";
14+
15+
pub const NATS_HEADER_UPDATE_TYPE: &str = "X-Telegram-Update-Type";
16+
pub const NATS_HEADER_UPDATE_ID: &str = "X-Telegram-Update-Id";
17+
18+
pub const UPDATE_TYPES: &[&str] = &[
19+
"message",
20+
"edited_message",
21+
"channel_post",
22+
"edited_channel_post",
23+
"business_connection",
24+
"business_message",
25+
"edited_business_message",
26+
"deleted_business_messages",
27+
"message_reaction",
28+
"message_reaction_count",
29+
"inline_query",
30+
"chosen_inline_result",
31+
"callback_query",
32+
"shipping_query",
33+
"pre_checkout_query",
34+
"purchased_paid_media",
35+
"poll",
36+
"poll_answer",
37+
"my_chat_member",
38+
"chat_member",
39+
"chat_join_request",
40+
"chat_boost",
41+
"removed_chat_boost",
42+
"managed_bot",
43+
];
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
//! # trogon-source-telegram
2+
//!
3+
//! Telegram webhook receiver that publishes updates to NATS JetStream.
4+
//!
5+
//! ## How it works
6+
//!
7+
//! 1. Telegram sends `POST /webhook` with `X-Telegram-Bot-Api-Secret-Token`
8+
//! header plus a JSON `Update` payload.
9+
//! 2. The server validates the secret token against `TELEGRAM_WEBHOOK_SECRET`.
10+
//! 3. Updates are published to NATS JetStream on `telegram.{update_type}` subjects
11+
//! (e.g. `telegram.message`, `telegram.callback_query`).
12+
//! 4. The JetStream stream (`TELEGRAM` by default, capturing `telegram.>`) is
13+
//! created automatically on startup if it doesn't exist.
14+
//!
15+
//! ## NATS message format
16+
//!
17+
//! - **Subject**: `{TELEGRAM_SUBJECT_PREFIX}.{update_type}` (e.g. `telegram.message`)
18+
//! - **Headers**: `X-Telegram-Update-Type`, `X-Telegram-Update-Id`
19+
//! - **Payload**: raw JSON body from Telegram
20+
//!
21+
//! ## Configuration (env vars)
22+
//!
23+
//! | Variable | Default | Description |
24+
//! |---|---|---|
25+
//! | `TELEGRAM_WEBHOOK_SECRET` | **required** | Secret token configured via `setWebhook` |
26+
//! | `TELEGRAM_SOURCE_PORT` | `8080` | HTTP listening port |
27+
//! | `TELEGRAM_SUBJECT_PREFIX` | `telegram` | NATS subject prefix |
28+
//! | `TELEGRAM_STREAM_NAME` | `TELEGRAM` | JetStream stream name |
29+
//! | `TELEGRAM_STREAM_MAX_AGE_SECS` | `604800` | Max age of messages in JetStream (seconds, default 7 days) |
30+
//! | `TELEGRAM_NATS_ACK_TIMEOUT_SECS` | `10` | NATS publish ack timeout in seconds |
31+
//! | `TELEGRAM_MAX_BODY_SIZE` | `10485760` | Maximum webhook body size in bytes (default 10 MB) |
32+
//! | `NATS_URL` | `localhost:4222` | NATS server URL(s) |
33+
34+
pub mod config;
35+
pub mod constants;
36+
pub mod server;
37+
pub mod signature;
38+
39+
pub use config::TelegramSourceConfig;
40+
#[cfg(not(coverage))]
41+
pub use server::{ServeError, serve};
42+
pub use server::{provision, router};
43+
pub use signature::SignatureError;

0 commit comments

Comments
 (0)