Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions rsworkspace/crates/trogon-gateway/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::fmt;
use std::num::NonZeroUsize;
use std::path::Path;

use confique::Config;
Expand All @@ -20,6 +21,17 @@ use trogon_std::{NonZeroDuration, ZeroDuration};

use crate::source_status::SourceStatus;

#[derive(Debug)]
struct ZeroNotAllowed;

impl fmt::Display for ZeroNotAllowed {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("must be greater than zero")
}
}

impl std::error::Error for ZeroNotAllowed {}

#[derive(Debug)]
pub enum ConfigValidationError {
InvalidField {
Expand Down Expand Up @@ -125,6 +137,10 @@ struct GatewayConfig {
http_server: HttpServerConfig,
#[config(nested)]
nats: NatsConfigSection,
// TODO: restore dynamic server-info negotiation once the async-nats race is resolved.
// See the TODO in main.rs for the full explanation.
#[config(env = "TROGON_GATEWAY_NATS_MAX_PAYLOAD_BYTES", default = 1_048_576)]
nats_max_payload_bytes: usize,
Comment thread
coderabbitai[bot] marked this conversation as resolved.
#[config(nested)]
sources: SourcesConfig,
}
Expand Down Expand Up @@ -307,6 +323,7 @@ pub struct ResolvedHttpServerConfig {
pub struct ResolvedConfig {
pub http_server: ResolvedHttpServerConfig,
pub nats: trogon_nats::NatsConfig,
pub nats_max_payload_bytes: NonZeroUsize,
pub github: Option<trogon_source_github::GithubConfig>,
pub discord: Option<trogon_source_discord::DiscordConfig>,
pub slack: Option<trogon_source_slack::SlackConfig>,
Expand Down Expand Up @@ -356,6 +373,18 @@ fn resolve(cfg: GatewayConfig, nats_overrides: &NatsArgs) -> Result<ResolvedConf
let linear = resolve_linear(cfg.sources.linear, &mut errors);
let notion = resolve_notion(cfg.sources.notion, &mut errors);

let nats_max_payload_bytes = match NonZeroUsize::new(cfg.nats_max_payload_bytes) {
Some(v) => v,
None => {
errors.push(ConfigValidationError::invalid(
"nats",
"nats_max_payload_bytes",
ZeroNotAllowed,
));
return Err(ConfigError::Validation(errors));
}
};

if !errors.is_empty() {
return Err(ConfigError::Validation(errors));
}
Expand All @@ -365,6 +394,7 @@ fn resolve(cfg: GatewayConfig, nats_overrides: &NatsArgs) -> Result<ResolvedConf
port: cfg.http_server.port,
},
nats,
nats_max_payload_bytes,
github,
discord,
slack,
Expand Down Expand Up @@ -2267,4 +2297,19 @@ timestamp_tolerance_secs = 0
let result = load(Some(f.path()));
assert!(matches!(result, Err(ConfigError::Load(_))));
}

#[test]
fn nats_max_payload_bytes_zero_is_error() {
let toml = r#"
nats_max_payload_bytes = 0

[sources.linear]
webhook_secret = "linear-secret"
"#;
let f = write_toml(toml);
let result = load(Some(f.path()));
assert!(
matches!(result, Err(ConfigError::Validation(ref errs)) if errs.iter().any(|e| e.contains("nats_max_payload_bytes")))
);
}
}
24 changes: 23 additions & 1 deletion rsworkspace/crates/trogon-gateway/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,29 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
info!("trogon-gateway starting");

let nats = connect(&resolved.nats, NATS_CONNECT_TIMEOUT).await?;
let max_payload = MaxPayload::from_server_limit(nats.server_info().max_payload);
// TODO: restore the line below once the async-nats race is resolved.
//
// `trogon_nats::connect` always sets `retry_on_initial_connect`, which causes
// async-nats to return the `Client` before the background connection task has
// populated the `ServerInfo` watch channel. Reading `server_info().max_payload`
// immediately after `connect()` therefore returns 0 (the `Default` value for
// `usize`), which collapses `MaxPayload::from_server_limit` to a threshold of 0
// and routes every payload — including tiny ones — through the object-store
// claim-check path.
//
// Until async-nats guarantees `server_info()` is populated before returning the
// `Client` (or `trogon_nats::connect` exposes a post-connection hook), the value
// is taken from config (`TROGON_GATEWAY_NATS_MAX_PAYLOAD_BYTES`, default 1 MiB).
//
// Tracking issue: https://github.com/TrogonStack/trogonai/issues/122
// let server_max_payload = nats.server_info().max_payload;
let server_max_payload = resolved.nats_max_payload_bytes.get();
let max_payload = MaxPayload::from_server_limit(server_max_payload);
info!(
server_max_payload_bytes = server_max_payload,
claim_check_threshold_bytes = max_payload.threshold(),
"NATS connected"
);
let js_context = async_nats::jetstream::new(nats.clone());
let object_store = NatsObjectStore::provision(
&js_context,
Expand Down
23 changes: 21 additions & 2 deletions rsworkspace/crates/trogon-nats/src/jetstream/claim_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::time::Duration;

use async_nats::HeaderMap;
use bytes::Bytes;
use tracing::error;
use tracing::{debug, error};

use super::object_store::{ObjectStoreGet, ObjectStorePut};
use super::publish::PublishOutcome;
Expand Down Expand Up @@ -144,7 +144,17 @@ impl<P: JetStreamPublisher, S: ObjectStorePut> ClaimCheckPublisher<P, S> {
payload: Bytes,
ack_timeout: Duration,
) -> PublishOutcome<P::PublishError> {
if payload.len() <= self.max_payload.threshold() {
let payload_bytes = payload.len();
let threshold = self.max_payload.threshold();

if payload_bytes <= threshold {
debug!(
nats.subject = %subject,
messaging.message.body.size = payload_bytes,
trogon.claim_check.threshold_bytes = threshold,
trogon.claim_check.used = false,
"publishing directly"
Comment thread
cursor[bot] marked this conversation as resolved.
);
return super::publish::publish_event(
&self.publisher,
subject,
Expand All @@ -157,6 +167,15 @@ impl<P: JetStreamPublisher, S: ObjectStorePut> ClaimCheckPublisher<P, S> {

let key = claim_object_key(&subject);

debug!(
nats.subject = %subject,
messaging.message.body.size = payload_bytes,
trogon.claim_check.threshold_bytes = threshold,
trogon.claim_check.used = true,
trogon.claim_check.key = %key,
"payload exceeds threshold, storing in object store"
);

// Store-then-publish: if publish fails, the object becomes orphaned.
// Cleanup relies on the object store bucket's TTL — see #101.
let mut cursor = std::io::Cursor::new(payload);
Expand Down
2 changes: 2 additions & 0 deletions rsworkspace/crates/trogon-source-linear/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ fn handle_webhook<P: JetStreamPublisher, S: ObjectStorePut>(
action = tracing::field::Empty,
webhook_id = tracing::field::Empty,
subject = tracing::field::Empty,
messaging.message.body.size = tracing::field::Empty,
)
)]
async fn handle_webhook_inner<P: JetStreamPublisher, S: ObjectStorePut>(
Expand Down Expand Up @@ -264,6 +265,7 @@ async fn handle_webhook_inner<P: JetStreamPublisher, S: ObjectStorePut>(
span.record("action", action.as_str());
span.record("webhook_id", &webhook_id);
span.record("subject", &subject);
span.record("messaging.message.body.size", body.len());

let mut nats_headers = async_nats::HeaderMap::new();
nats_headers.insert("Nats-Msg-Id", webhook_id.as_str());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
{
"action": "create",
"actor": {
"id": "00000000-0000-4000-8000-000000000010",
"name": "Test User",
"email": "test.user@example.invalid",
"avatarUrl": "https://example.invalid/avatar.png",
"url": "https://example.invalid/profiles/test-user",
"type": "user"
},
"createdAt": "2026-04-13T00:18:39.406Z",
"data": {
"id": "00000000-0000-4000-8000-000000000030",
"createdAt": "2026-04-13T00:18:39.406Z",
"updatedAt": "2026-04-13T00:18:39.398Z",
"body": "Test comment body",
"issueId": "00000000-0000-4000-8000-000000000020",
"userId": "00000000-0000-4000-8000-000000000010",
"reactionData": [],
"isArtificialAgentSessionRoot": false,
"botActor": null,
"user": {
"id": "00000000-0000-4000-8000-000000000010",
"name": "Test User",
"email": "test.user@example.invalid",
"avatarUrl": "https://example.invalid/avatar.png",
"url": "https://example.invalid/profiles/test-user"
},
"issue": {
"id": "00000000-0000-4000-8000-000000000020",
"title": "Demo 2",
"teamId": "00000000-0000-4000-8000-000000000040",
"team": {
"id": "00000000-0000-4000-8000-000000000040",
"key": "STM",
"name": "Test Team"
},
"identifier": "STM-14",
"url": "https://example.invalid/issue/STM-14/demo-2"
}
},
"url": "https://example.invalid/issue/STM-14/demo-2#comment-00000000",
"type": "Comment",
"organizationId": "00000000-0000-4000-8000-000000000001",
"webhookTimestamp": 1776039519669,
"webhookId": "00000000-0000-4000-8000-000000000002"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
{
"action": "create",
"actor": {
"id": "00000000-0000-4000-8000-000000000010",
"name": "Test User",
"email": "test.user@example.invalid",
"avatarUrl": "https://example.invalid/avatar.png",
"url": "https://example.invalid/profiles/test-user",
"type": "user"
},
"createdAt": "2026-04-12T05:13:18.613Z",
"data": {
"id": "00000000-0000-4000-8000-000000000020",
"createdAt": "2026-04-12T05:13:18.613Z",
"updatedAt": "2026-04-12T05:13:18.613Z",
"number": 14,
"title": "Demo 2",
"priority": 0,
"sortOrder": 33,
"prioritySortOrder": 43.65,
"slaType": "all",
"addedToTeamAt": "2026-04-12T05:13:18.629Z",
"labelIds": [],
"teamId": "00000000-0000-4000-8000-000000000040",
"previousIdentifiers": [],
"creatorId": "00000000-0000-4000-8000-000000000010",
"stateId": "00000000-0000-4000-8000-000000000050",
"reactionData": [],
"priorityLabel": "No priority",
"inheritsSharedAccess": false,
"botActor": null,
"identifier": "STM-14",
"url": "https://example.invalid/issue/STM-14/demo-2",
"subscriberIds": [
"00000000-0000-4000-8000-000000000010"
],
"state": {
"id": "00000000-0000-4000-8000-000000000050",
"color": "#bec2c8",
"name": "Backlog",
"type": "backlog"
},
"team": {
"id": "00000000-0000-4000-8000-000000000040",
"key": "STM",
"name": "Test Team"
},
"labels": [],
"description": "",
"descriptionData": "{\"type\":\"doc\",\"content\":[{\"type\":\"paragraph\"}]}"
},
"url": "https://example.invalid/issue/STM-14/demo-2",
"type": "Issue",
"organizationId": "00000000-0000-4000-8000-000000000001",
"webhookTimestamp": 1775990870911,
"webhookId": "00000000-0000-4000-8000-000000000002"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
{
"action": "update",
"actor": {
"id": "00000000-0000-4000-8000-000000000010",
"name": "Test User",
"email": "test.user@example.invalid",
"avatarUrl": "https://example.invalid/avatar.png",
"url": "https://example.invalid/profiles/test-user",
"type": "user"
},
"createdAt": "2026-04-12T04:54:27.300Z",
"data": {
"id": "00000000-0000-4000-8000-000000000021",
"createdAt": "2025-11-26T07:49:10.704Z",
"updatedAt": "2026-04-12T04:54:27.299Z",
"number": 13,
"title": "Demo",
"priority": 0,
"sortOrder": -45.11,
"prioritySortOrder": 94.09,
"startedAt": "2025-11-26T07:54:03.531Z",
"canceledAt": "2026-04-12T04:54:27.288Z",
"slaType": "all",
"addedToTeamAt": "2025-11-26T07:49:10.737Z",
"labelIds": [
"00000000-0000-4000-8000-000000000060"
],
"teamId": "00000000-0000-4000-8000-000000000040",
"previousIdentifiers": [],
"creatorId": "00000000-0000-4000-8000-000000000010",
"assigneeId": "00000000-0000-4000-8000-000000000010",
"stateId": "00000000-0000-4000-8000-000000000051",
"reactionData": [],
"priorityLabel": "No priority",
"inheritsSharedAccess": false,
"botActor": null,
"identifier": "STM-13",
"url": "https://example.invalid/issue/STM-13/demo",
"subscriberIds": [
"00000000-0000-4000-8000-000000000010"
],
"assignee": {
"id": "00000000-0000-4000-8000-000000000010",
"name": "Test User",
"email": "test.user@example.invalid",
"avatarUrl": "https://example.invalid/avatar.png",
"url": "https://example.invalid/profiles/test-user"
},
"state": {
"id": "00000000-0000-4000-8000-000000000051",
"color": "#95a2b3",
"name": "Canceled",
"type": "canceled"
},
"team": {
"id": "00000000-0000-4000-8000-000000000040",
"key": "STM",
"name": "Test Team"
},
"labels": [
{
"id": "00000000-0000-4000-8000-000000000060",
"color": "#BB87FC",
"name": "Feature"
}
],
"description": "Test issue description",
"descriptionData": "{\"type\":\"doc\",\"content\":[{\"type\":\"paragraph\",\"content\":[{\"type\":\"text\",\"text\":\"Test issue description\"}]}]}"
},
"updatedFrom": {
"stateId": "00000000-0000-4000-8000-000000000053",
"sortOrder": 61.53,
"updatedAt": "2026-04-12T04:54:26.357Z",
"canceledAt": null,
"completedAt": "2026-04-12T04:54:26.345Z"
},
"url": "https://example.invalid/issue/STM-13/demo",
"type": "Issue",
"organizationId": "00000000-0000-4000-8000-000000000001",
"webhookTimestamp": 1775990847284,
"webhookId": "00000000-0000-4000-8000-000000000002"
}
Loading
Loading