Skip to content

Commit a8afecd

Browse files
committed
fix(trogon-gateway): fix claim-check always triggered due to async-nats race
When `retry_on_initial_connect` is set, async-nats returns the `Client` before the background connection task populates the `ServerInfo` watch channel, so `server_info().max_payload` is always 0 immediately after connect — collapsing the claim-check threshold to 0 and routing every payload through the object-store path. `trogon_nats::connect` always sets `retry_on_initial_connect`, making this race deterministic in production. Workaround: read the limit from `TROGON_GATEWAY_NATS_MAX_PAYLOAD_BYTES` (default 1 MiB) instead of querying the server. The original line is kept as a comment pending resolution of the upstream race. Tracking issue: #122 Also adds OTel instrumentation to `ClaimCheckPublisher::publish_event` and the Linear webhook handler so the claim-check path decision is observable in traces and structured logs, and adds 20 production fixture payloads (PII-sanitised) with integration tests for the Linear source. Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
1 parent 29026b9 commit a8afecd

10 files changed

+636
-3
lines changed

rsworkspace/crates/trogon-gateway/src/config.rs

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use std::fmt;
2+
use std::num::NonZeroUsize;
23
use std::path::Path;
34

45
use confique::Config;
@@ -20,6 +21,17 @@ use trogon_std::{NonZeroDuration, ZeroDuration};
2021

2122
use crate::source_status::SourceStatus;
2223

24+
#[derive(Debug)]
25+
struct ZeroNotAllowed;
26+
27+
impl fmt::Display for ZeroNotAllowed {
28+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
29+
f.write_str("must be greater than zero")
30+
}
31+
}
32+
33+
impl std::error::Error for ZeroNotAllowed {}
34+
2335
#[derive(Debug)]
2436
pub enum ConfigValidationError {
2537
InvalidField {
@@ -125,6 +137,10 @@ struct GatewayConfig {
125137
http_server: HttpServerConfig,
126138
#[config(nested)]
127139
nats: NatsConfigSection,
140+
// TODO: restore dynamic server-info negotiation once the async-nats race is resolved.
141+
// See the TODO in main.rs for the full explanation.
142+
#[config(env = "TROGON_GATEWAY_NATS_MAX_PAYLOAD_BYTES", default = 1_048_576)]
143+
nats_max_payload_bytes: usize,
128144
#[config(nested)]
129145
sources: SourcesConfig,
130146
}
@@ -307,6 +323,7 @@ pub struct ResolvedHttpServerConfig {
307323
pub struct ResolvedConfig {
308324
pub http_server: ResolvedHttpServerConfig,
309325
pub nats: trogon_nats::NatsConfig,
326+
pub nats_max_payload_bytes: NonZeroUsize,
310327
pub github: Option<trogon_source_github::GithubConfig>,
311328
pub discord: Option<trogon_source_discord::DiscordConfig>,
312329
pub slack: Option<trogon_source_slack::SlackConfig>,
@@ -356,6 +373,18 @@ fn resolve(cfg: GatewayConfig, nats_overrides: &NatsArgs) -> Result<ResolvedConf
356373
let linear = resolve_linear(cfg.sources.linear, &mut errors);
357374
let notion = resolve_notion(cfg.sources.notion, &mut errors);
358375

376+
let nats_max_payload_bytes = match NonZeroUsize::new(cfg.nats_max_payload_bytes) {
377+
Some(v) => v,
378+
None => {
379+
errors.push(ConfigValidationError::invalid(
380+
"nats",
381+
"nats_max_payload_bytes",
382+
ZeroNotAllowed,
383+
));
384+
return Err(ConfigError::Validation(errors));
385+
}
386+
};
387+
359388
if !errors.is_empty() {
360389
return Err(ConfigError::Validation(errors));
361390
}
@@ -365,6 +394,7 @@ fn resolve(cfg: GatewayConfig, nats_overrides: &NatsArgs) -> Result<ResolvedConf
365394
port: cfg.http_server.port,
366395
},
367396
nats,
397+
nats_max_payload_bytes,
368398
github,
369399
discord,
370400
slack,
@@ -2267,4 +2297,19 @@ timestamp_tolerance_secs = 0
22672297
let result = load(Some(f.path()));
22682298
assert!(matches!(result, Err(ConfigError::Load(_))));
22692299
}
2300+
2301+
#[test]
2302+
fn nats_max_payload_bytes_zero_is_error() {
2303+
let toml = r#"
2304+
nats_max_payload_bytes = 0
2305+
2306+
[sources.linear]
2307+
webhook_secret = "linear-secret"
2308+
"#;
2309+
let f = write_toml(toml);
2310+
let result = load(Some(f.path()));
2311+
assert!(
2312+
matches!(result, Err(ConfigError::Validation(ref errs)) if errs.iter().any(|e| e.contains("nats_max_payload_bytes")))
2313+
);
2314+
}
22702315
}

rsworkspace/crates/trogon-gateway/src/main.rs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,29 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
5959
info!("trogon-gateway starting");
6060

6161
let nats = connect(&resolved.nats, NATS_CONNECT_TIMEOUT).await?;
62-
let max_payload = MaxPayload::from_server_limit(nats.server_info().max_payload);
62+
// TODO: restore the line below once the async-nats race is resolved.
63+
//
64+
// `trogon_nats::connect` always sets `retry_on_initial_connect`, which causes
65+
// async-nats to return the `Client` before the background connection task has
66+
// populated the `ServerInfo` watch channel. Reading `server_info().max_payload`
67+
// immediately after `connect()` therefore returns 0 (the `Default` value for
68+
// `usize`), which collapses `MaxPayload::from_server_limit` to a threshold of 0
69+
// and routes every payload — including tiny ones — through the object-store
70+
// claim-check path.
71+
//
72+
// Until async-nats guarantees `server_info()` is populated before returning the
73+
// `Client` (or `trogon_nats::connect` exposes a post-connection hook), the value
74+
// is taken from config (`TROGON_GATEWAY_NATS_MAX_PAYLOAD_BYTES`, default 1 MiB).
75+
//
76+
// Tracking issue: https://github.com/TrogonStack/trogonai/issues/122
77+
// let server_max_payload = nats.server_info().max_payload;
78+
let server_max_payload = resolved.nats_max_payload_bytes.get();
79+
let max_payload = MaxPayload::from_server_limit(server_max_payload);
80+
info!(
81+
server_max_payload_bytes = server_max_payload,
82+
claim_check_threshold_bytes = max_payload.threshold(),
83+
"NATS connected"
84+
);
6385
let js_context = async_nats::jetstream::new(nats.clone());
6486
let object_store = NatsObjectStore::provision(
6587
&js_context,

rsworkspace/crates/trogon-nats/src/jetstream/claim_check.rs

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::time::Duration;
33

44
use async_nats::HeaderMap;
55
use bytes::Bytes;
6-
use tracing::error;
6+
use tracing::{debug, error};
77

88
use super::object_store::{ObjectStoreGet, ObjectStorePut};
99
use super::publish::PublishOutcome;
@@ -144,7 +144,17 @@ impl<P: JetStreamPublisher, S: ObjectStorePut> ClaimCheckPublisher<P, S> {
144144
payload: Bytes,
145145
ack_timeout: Duration,
146146
) -> PublishOutcome<P::PublishError> {
147-
if payload.len() <= self.max_payload.threshold() {
147+
let payload_bytes = payload.len();
148+
let threshold = self.max_payload.threshold();
149+
150+
if payload_bytes <= threshold {
151+
debug!(
152+
nats.subject = %subject,
153+
messaging.message.body.size = payload_bytes,
154+
trogon.claim_check.threshold_bytes = threshold,
155+
trogon.claim_check.used = false,
156+
"publishing directly"
157+
);
148158
return super::publish::publish_event(
149159
&self.publisher,
150160
subject,
@@ -157,6 +167,15 @@ impl<P: JetStreamPublisher, S: ObjectStorePut> ClaimCheckPublisher<P, S> {
157167

158168
let key = claim_object_key(&subject);
159169

170+
debug!(
171+
nats.subject = %subject,
172+
messaging.message.body.size = payload_bytes,
173+
trogon.claim_check.threshold_bytes = threshold,
174+
trogon.claim_check.used = true,
175+
trogon.claim_check.key = %key,
176+
"payload exceeds threshold, storing in object store"
177+
);
178+
160179
// Store-then-publish: if publish fails, the object becomes orphaned.
161180
// Cleanup relies on the object store bucket's TTL — see #101.
162181
let mut cursor = std::io::Cursor::new(payload);

rsworkspace/crates/trogon-source-linear/src/server.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ fn handle_webhook<P: JetStreamPublisher, S: ObjectStorePut>(
133133
action = tracing::field::Empty,
134134
webhook_id = tracing::field::Empty,
135135
subject = tracing::field::Empty,
136+
messaging.message.body.size = tracing::field::Empty,
136137
)
137138
)]
138139
async fn handle_webhook_inner<P: JetStreamPublisher, S: ObjectStorePut>(
@@ -264,6 +265,7 @@ async fn handle_webhook_inner<P: JetStreamPublisher, S: ObjectStorePut>(
264265
span.record("action", action.as_str());
265266
span.record("webhook_id", &webhook_id);
266267
span.record("subject", &subject);
268+
span.record("messaging.message.body.size", body.len());
267269

268270
let mut nats_headers = async_nats::HeaderMap::new();
269271
nats_headers.insert("Nats-Msg-Id", webhook_id.as_str());
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
{
2+
"action": "create",
3+
"actor": {
4+
"id": "b915a192-d677-4da7-9b26-d10794c63c28",
5+
"name": "Test User",
6+
"email": "test.user@example.invalid",
7+
"avatarUrl": "https://example.invalid/avatar.png",
8+
"url": "https://example.invalid/profiles/test-user",
9+
"type": "user"
10+
},
11+
"createdAt": "2026-04-13T00:18:39.406Z",
12+
"data": {
13+
"id": "d8589dfa-7102-42a3-8162-dd8891110d3b",
14+
"createdAt": "2026-04-13T00:18:39.406Z",
15+
"updatedAt": "2026-04-13T00:18:39.398Z",
16+
"body": "asdsa",
17+
"issueId": "eb9aa286-ee58-4089-82c7-91245745cd11",
18+
"userId": "b915a192-d677-4da7-9b26-d10794c63c28",
19+
"reactionData": [],
20+
"isArtificialAgentSessionRoot": false,
21+
"botActor": null,
22+
"user": {
23+
"id": "b915a192-d677-4da7-9b26-d10794c63c28",
24+
"name": "Test User",
25+
"email": "test.user@example.invalid",
26+
"avatarUrl": "https://example.invalid/avatar.png",
27+
"url": "https://example.invalid/profiles/test-user"
28+
},
29+
"issue": {
30+
"id": "eb9aa286-ee58-4089-82c7-91245745cd11",
31+
"title": "Demo 2",
32+
"teamId": "eafa5651-df81-4e77-b369-52cc26127db3",
33+
"team": {
34+
"id": "eafa5651-df81-4e77-b369-52cc26127db3",
35+
"key": "STM",
36+
"name": "Test Team"
37+
},
38+
"identifier": "STM-14",
39+
"url": "https://example.invalid/issue/STM-14/demo-2"
40+
}
41+
},
42+
"url": "https://example.invalid/issue/STM-14/demo-2#comment-d8589dfa",
43+
"type": "Comment",
44+
"organizationId": "315a704e-7d8a-4db3-90ee-941d5d8cbe8b",
45+
"webhookTimestamp": 1776039519669,
46+
"webhookId": "4849cf83-0c00-4bb6-99ea-0dd330cd5f0d"
47+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
{
2+
"action": "create",
3+
"actor": {
4+
"id": "b915a192-d677-4da7-9b26-d10794c63c28",
5+
"name": "Test User",
6+
"email": "test.user@example.invalid",
7+
"avatarUrl": "https://example.invalid/avatar.png",
8+
"url": "https://example.invalid/profiles/test-user",
9+
"type": "user"
10+
},
11+
"createdAt": "2026-04-12T05:13:18.613Z",
12+
"data": {
13+
"id": "eb9aa286-ee58-4089-82c7-91245745cd11",
14+
"createdAt": "2026-04-12T05:13:18.613Z",
15+
"updatedAt": "2026-04-12T05:13:18.613Z",
16+
"number": 14,
17+
"title": "Demo 2",
18+
"priority": 0,
19+
"sortOrder": 33,
20+
"prioritySortOrder": 43.65,
21+
"slaType": "all",
22+
"addedToTeamAt": "2026-04-12T05:13:18.629Z",
23+
"labelIds": [],
24+
"teamId": "eafa5651-df81-4e77-b369-52cc26127db3",
25+
"previousIdentifiers": [],
26+
"creatorId": "b915a192-d677-4da7-9b26-d10794c63c28",
27+
"stateId": "f2760411-fb81-44ff-99b3-4b8d34200bd9",
28+
"reactionData": [],
29+
"priorityLabel": "No priority",
30+
"inheritsSharedAccess": false,
31+
"botActor": null,
32+
"identifier": "STM-14",
33+
"url": "https://example.invalid/issue/STM-14/demo-2",
34+
"subscriberIds": [
35+
"b915a192-d677-4da7-9b26-d10794c63c28"
36+
],
37+
"state": {
38+
"id": "f2760411-fb81-44ff-99b3-4b8d34200bd9",
39+
"color": "#bec2c8",
40+
"name": "Backlog",
41+
"type": "backlog"
42+
},
43+
"team": {
44+
"id": "eafa5651-df81-4e77-b369-52cc26127db3",
45+
"key": "STM",
46+
"name": "Test Team"
47+
},
48+
"labels": [],
49+
"description": "",
50+
"descriptionData": "{\"type\":\"doc\",\"content\":[{\"type\":\"paragraph\"}]}"
51+
},
52+
"url": "https://example.invalid/issue/STM-14/demo-2",
53+
"type": "Issue",
54+
"organizationId": "315a704e-7d8a-4db3-90ee-941d5d8cbe8b",
55+
"webhookTimestamp": 1775990870911,
56+
"webhookId": "4849cf83-0c00-4bb6-99ea-0dd330cd5f0d"
57+
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
{
2+
"action": "update",
3+
"actor": {
4+
"id": "b915a192-d677-4da7-9b26-d10794c63c28",
5+
"name": "Test User",
6+
"email": "test.user@example.invalid",
7+
"avatarUrl": "https://example.invalid/avatar.png",
8+
"url": "https://example.invalid/profiles/test-user",
9+
"type": "user"
10+
},
11+
"createdAt": "2026-04-12T04:54:27.300Z",
12+
"data": {
13+
"id": "8ccf0ea8-03ae-4e4c-b570-d4b8228490e2",
14+
"createdAt": "2025-11-26T07:49:10.704Z",
15+
"updatedAt": "2026-04-12T04:54:27.299Z",
16+
"number": 13,
17+
"title": "Demo",
18+
"priority": 0,
19+
"sortOrder": -45.11,
20+
"prioritySortOrder": 94.09,
21+
"startedAt": "2025-11-26T07:54:03.531Z",
22+
"canceledAt": "2026-04-12T04:54:27.288Z",
23+
"slaType": "all",
24+
"addedToTeamAt": "2025-11-26T07:49:10.737Z",
25+
"labelIds": [
26+
"574ea29d-a4b3-488d-b006-eb58c9e5f856"
27+
],
28+
"teamId": "eafa5651-df81-4e77-b369-52cc26127db3",
29+
"previousIdentifiers": [],
30+
"creatorId": "b915a192-d677-4da7-9b26-d10794c63c28",
31+
"assigneeId": "b915a192-d677-4da7-9b26-d10794c63c28",
32+
"stateId": "3dbc33f5-20bd-473f-a8b4-512df0c7ba51",
33+
"reactionData": [],
34+
"priorityLabel": "No priority",
35+
"inheritsSharedAccess": false,
36+
"botActor": null,
37+
"identifier": "STM-13",
38+
"url": "https://example.invalid/issue/STM-13/demo",
39+
"subscriberIds": [
40+
"b915a192-d677-4da7-9b26-d10794c63c28"
41+
],
42+
"assignee": {
43+
"id": "b915a192-d677-4da7-9b26-d10794c63c28",
44+
"name": "Test User",
45+
"email": "test.user@example.invalid",
46+
"avatarUrl": "https://example.invalid/avatar.png",
47+
"url": "https://example.invalid/profiles/test-user"
48+
},
49+
"state": {
50+
"id": "3dbc33f5-20bd-473f-a8b4-512df0c7ba51",
51+
"color": "#95a2b3",
52+
"name": "Canceled",
53+
"type": "canceled"
54+
},
55+
"team": {
56+
"id": "eafa5651-df81-4e77-b369-52cc26127db3",
57+
"key": "STM",
58+
"name": "Test Team"
59+
},
60+
"labels": [
61+
{
62+
"id": "574ea29d-a4b3-488d-b006-eb58c9e5f856",
63+
"color": "#BB87FC",
64+
"name": "Feature"
65+
}
66+
],
67+
"description": "Berry nice, she is a 10",
68+
"descriptionData": "{\"type\":\"doc\",\"content\":[{\"type\":\"paragraph\",\"content\":[{\"type\":\"text\",\"text\":\"Berry nice, she is a 10\"}]}]}"
69+
},
70+
"updatedFrom": {
71+
"stateId": "c9e5e61e-3bac-448a-a7e1-25b2d8bc7cda",
72+
"sortOrder": 61.53,
73+
"updatedAt": "2026-04-12T04:54:26.357Z",
74+
"canceledAt": null,
75+
"completedAt": "2026-04-12T04:54:26.345Z"
76+
},
77+
"url": "https://example.invalid/issue/STM-13/demo",
78+
"type": "Issue",
79+
"organizationId": "315a704e-7d8a-4db3-90ee-941d5d8cbe8b",
80+
"webhookTimestamp": 1775990847284,
81+
"webhookId": "4849cf83-0c00-4bb6-99ea-0dd330cd5f0d"
82+
}

0 commit comments

Comments
 (0)