Skip to content

Commit 1ab42a3

Browse files
feat: QUIC agent tunnel — protocol, listener, agent client
Add QUIC-based agent tunnel core infrastructure. Agents in private networks connect outbound to Gateway via QUIC/mTLS, advertise reachable subnets and domains, and proxy TCP connections on behalf of Gateway. Protocol (agent-tunnel-proto crate): - RouteAdvertise with subnets + domain advertisements - ConnectMessage/ConnectResponse for session stream setup - Heartbeat/HeartbeatAck for liveness detection - Protocol version negotiation (v2) Gateway (agent_tunnel module): - QUIC listener with mTLS authentication - Agent registry with subnet/domain tracking - Certificate authority for agent enrollment - Enrollment token store (one-time tokens) - Bidirectional proxy stream multiplexing Agent (devolutions-agent): - QUIC client with auto-reconnect and exponential backoff - Agent enrollment with config merge (preserves existing settings) - Domain auto-detection (Windows: USERDNSDOMAIN, Linux: resolv.conf) - Subnet validation on incoming connections - Certificate file permissions (0o600 on Unix) API endpoints: - POST /jet/agent-tunnel/enroll — agent enrollment - GET /jet/agent-tunnel/agents — list agents - GET /jet/agent-tunnel/agents/{id} — get agent - DELETE /jet/agent-tunnel/agents/{id} — delete agent - POST /jet/agent-tunnel/agents/resolve-target — routing diagnostics Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 372f08e commit 1ab42a3

36 files changed

+5142
-11
lines changed

Cargo.lock

Lines changed: 210 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
[package]
2+
name = "agent-tunnel-proto"
3+
version = "0.0.0"
4+
authors = ["Devolutions Inc. <infos@devolutions.net>"]
5+
edition = "2024"
6+
publish = false
7+
8+
[lints]
9+
workspace = true
10+
11+
[dependencies]
12+
bincode = "1.3"
13+
ipnetwork = "0.20"
14+
serde = { version = "1", features = ["derive"] }
15+
thiserror = "2.0"
16+
tokio = { version = "1.45", features = ["io-util"] }
17+
uuid = { version = "1.17", features = ["v4", "serde"] }
18+
19+
[dev-dependencies]
20+
proptest = "1.7"
21+
tokio = { version = "1.45", features = ["rt", "macros"] }
Lines changed: 301 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,301 @@
1+
use ipnetwork::Ipv4Network;
2+
use serde::{Deserialize, Serialize};
3+
use tokio::io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _};
4+
5+
use crate::error::ProtoError;
6+
use crate::version::CURRENT_PROTOCOL_VERSION;
7+
8+
/// Maximum encoded message size (1 MiB) to prevent denial-of-service via oversized frames.
9+
pub const MAX_CONTROL_MESSAGE_SIZE: u32 = 1024 * 1024;
10+
11+
/// A DNS domain advertisement with its source.
12+
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
13+
pub struct DomainAdvertisement {
14+
/// The DNS domain (e.g., "contoso.local").
15+
pub domain: String,
16+
/// Whether this domain was auto-detected (`true`) or explicitly configured (`false`).
17+
pub auto_detected: bool,
18+
}
19+
20+
/// Control-plane messages exchanged over the dedicated control stream (stream ID 0).
21+
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
22+
pub enum ControlMessage {
23+
/// Agent advertises subnets and domains it can reach.
24+
RouteAdvertise {
25+
protocol_version: u16,
26+
/// Monotonically increasing epoch within this agent process lifetime.
27+
epoch: u64,
28+
/// Reachable IPv4 subnets.
29+
subnets: Vec<Ipv4Network>,
30+
/// DNS domains this agent can resolve, with source tracking.
31+
domains: Vec<DomainAdvertisement>,
32+
},
33+
34+
/// Periodic liveness probe.
35+
Heartbeat {
36+
protocol_version: u16,
37+
/// Milliseconds since UNIX epoch (sender's wall clock).
38+
timestamp_ms: u64,
39+
/// Number of currently active proxy streams on this connection.
40+
active_stream_count: u32,
41+
},
42+
43+
/// Acknowledgement to a Heartbeat.
44+
HeartbeatAck {
45+
protocol_version: u16,
46+
/// Echoed timestamp from the corresponding Heartbeat.
47+
timestamp_ms: u64,
48+
},
49+
}
50+
51+
impl ControlMessage {
52+
/// Create a new RouteAdvertise with the current protocol version.
53+
pub fn route_advertise(epoch: u64, subnets: Vec<Ipv4Network>, domains: Vec<DomainAdvertisement>) -> Self {
54+
Self::RouteAdvertise {
55+
protocol_version: CURRENT_PROTOCOL_VERSION,
56+
epoch,
57+
subnets,
58+
domains,
59+
}
60+
}
61+
62+
/// Create a new Heartbeat with the current protocol version.
63+
pub fn heartbeat(timestamp_ms: u64, active_stream_count: u32) -> Self {
64+
Self::Heartbeat {
65+
protocol_version: CURRENT_PROTOCOL_VERSION,
66+
timestamp_ms,
67+
active_stream_count,
68+
}
69+
}
70+
71+
/// Create a new HeartbeatAck with the current protocol version.
72+
pub fn heartbeat_ack(timestamp_ms: u64) -> Self {
73+
Self::HeartbeatAck {
74+
protocol_version: CURRENT_PROTOCOL_VERSION,
75+
timestamp_ms,
76+
}
77+
}
78+
79+
/// Length-prefixed bincode encode and write to an async writer.
80+
pub async fn encode<W: AsyncWrite + Unpin>(&self, writer: &mut W) -> Result<(), ProtoError> {
81+
let payload = bincode::serialize(self)?;
82+
let len = u32::try_from(payload.len()).map_err(|_| ProtoError::MessageTooLarge {
83+
size: u32::MAX,
84+
max: MAX_CONTROL_MESSAGE_SIZE,
85+
})?;
86+
writer.write_all(&len.to_be_bytes()).await?;
87+
writer.write_all(&payload).await?;
88+
writer.flush().await?;
89+
Ok(())
90+
}
91+
92+
/// Read and decode a length-prefixed bincode message from an async reader.
93+
pub async fn decode<R: AsyncRead + Unpin>(reader: &mut R) -> Result<Self, ProtoError> {
94+
let mut len_buf = [0u8; 4];
95+
reader.read_exact(&mut len_buf).await?;
96+
let len = u32::from_be_bytes(len_buf);
97+
98+
if len > MAX_CONTROL_MESSAGE_SIZE {
99+
return Err(ProtoError::MessageTooLarge {
100+
size: len,
101+
max: MAX_CONTROL_MESSAGE_SIZE,
102+
});
103+
}
104+
105+
let mut payload = vec![0u8; len as usize];
106+
reader.read_exact(&mut payload).await?;
107+
let msg: Self = bincode::deserialize(&payload)?;
108+
Ok(msg)
109+
}
110+
111+
/// Extract the protocol version from any variant.
112+
pub fn protocol_version(&self) -> u16 {
113+
match self {
114+
Self::RouteAdvertise { protocol_version, .. }
115+
| Self::Heartbeat { protocol_version, .. }
116+
| Self::HeartbeatAck { protocol_version, .. } => *protocol_version,
117+
}
118+
}
119+
}
120+
121+
#[cfg(test)]
122+
mod tests {
123+
use super::*;
124+
125+
#[tokio::test]
126+
async fn roundtrip_route_advertise() {
127+
let msg = ControlMessage::route_advertise(
128+
42,
129+
vec![
130+
"10.0.0.0/8".parse().expect("valid CIDR"),
131+
"192.168.1.0/24".parse().expect("valid CIDR"),
132+
],
133+
vec![],
134+
);
135+
136+
let mut buf = Vec::new();
137+
msg.encode(&mut buf).await.expect("encode should succeed");
138+
139+
let decoded = ControlMessage::decode(&mut buf.as_slice())
140+
.await
141+
.expect("decode should succeed");
142+
143+
assert_eq!(msg, decoded);
144+
}
145+
146+
#[tokio::test]
147+
async fn roundtrip_route_advertise_with_domains() {
148+
let msg = ControlMessage::route_advertise(
149+
42,
150+
vec!["10.0.0.0/8".parse().expect("valid CIDR")],
151+
vec![
152+
DomainAdvertisement { domain: "contoso.local".to_owned(), auto_detected: false },
153+
DomainAdvertisement { domain: "finance.contoso.local".to_owned(), auto_detected: true },
154+
],
155+
);
156+
157+
let mut buf = Vec::new();
158+
msg.encode(&mut buf).await.expect("encode should succeed");
159+
160+
let decoded = ControlMessage::decode(&mut buf.as_slice())
161+
.await
162+
.expect("decode should succeed");
163+
164+
assert_eq!(msg, decoded);
165+
166+
match &decoded {
167+
ControlMessage::RouteAdvertise { domains, .. } => {
168+
assert_eq!(domains.len(), 2);
169+
assert_eq!(domains[0].domain, "contoso.local");
170+
assert!(!domains[0].auto_detected);
171+
assert_eq!(domains[1].domain, "finance.contoso.local");
172+
assert!(domains[1].auto_detected);
173+
}
174+
_ => panic!("expected RouteAdvertise"),
175+
}
176+
}
177+
178+
#[tokio::test]
179+
async fn roundtrip_route_advertise_empty_domains() {
180+
let msg = ControlMessage::route_advertise(
181+
1,
182+
vec!["192.168.1.0/24".parse().expect("valid CIDR")],
183+
vec![],
184+
);
185+
186+
let mut buf = Vec::new();
187+
msg.encode(&mut buf).await.expect("encode should succeed");
188+
189+
let decoded = ControlMessage::decode(&mut buf.as_slice())
190+
.await
191+
.expect("decode should succeed");
192+
193+
assert_eq!(msg, decoded);
194+
}
195+
196+
#[tokio::test]
197+
async fn roundtrip_heartbeat() {
198+
let msg = ControlMessage::heartbeat(1_700_000_000_000, 5);
199+
200+
let mut buf = Vec::new();
201+
msg.encode(&mut buf).await.expect("encode should succeed");
202+
203+
let decoded = ControlMessage::decode(&mut buf.as_slice())
204+
.await
205+
.expect("decode should succeed");
206+
207+
assert_eq!(msg, decoded);
208+
}
209+
210+
#[tokio::test]
211+
async fn roundtrip_heartbeat_ack() {
212+
let msg = ControlMessage::heartbeat_ack(1_700_000_000_000);
213+
214+
let mut buf = Vec::new();
215+
msg.encode(&mut buf).await.expect("encode should succeed");
216+
217+
let decoded = ControlMessage::decode(&mut buf.as_slice())
218+
.await
219+
.expect("decode should succeed");
220+
221+
assert_eq!(msg, decoded);
222+
}
223+
224+
#[tokio::test]
225+
async fn reject_oversized_message() {
226+
// Craft a length prefix that exceeds the maximum
227+
let bad_len = (MAX_CONTROL_MESSAGE_SIZE + 1).to_be_bytes();
228+
let mut buf = bad_len.to_vec();
229+
buf.extend_from_slice(&[0u8; 32]); // dummy payload
230+
231+
let result = ControlMessage::decode(&mut buf.as_slice()).await;
232+
assert!(result.is_err());
233+
}
234+
}
235+
236+
#[cfg(test)]
237+
mod proptests {
238+
use proptest::prelude::*;
239+
240+
use super::*;
241+
use crate::version::CURRENT_PROTOCOL_VERSION;
242+
243+
fn arb_ipv4_network() -> impl Strategy<Value = Ipv4Network> {
244+
(any::<[u8; 4]>(), 0u8..=32).prop_map(|(octets, prefix)| {
245+
let ip = std::net::Ipv4Addr::from(octets);
246+
// Use network() to normalize the address for the given prefix
247+
Ipv4Network::new(ip, prefix)
248+
.map(|n| Ipv4Network::new(n.network(), prefix).expect("normalized network should be valid"))
249+
.unwrap_or_else(|_| Ipv4Network::new(std::net::Ipv4Addr::UNSPECIFIED, 0).expect("0.0.0.0/0 is valid"))
250+
})
251+
}
252+
253+
fn arb_domain_advertisement() -> impl Strategy<Value = DomainAdvertisement> {
254+
("[a-z]{3,10}\\.[a-z]{2,5}", any::<bool>()).prop_map(|(domain, auto_detected)| {
255+
DomainAdvertisement { domain, auto_detected }
256+
})
257+
}
258+
259+
fn arb_control_message() -> impl Strategy<Value = ControlMessage> {
260+
prop_oneof![
261+
(
262+
any::<u64>(),
263+
proptest::collection::vec(arb_ipv4_network(), 0..50),
264+
proptest::collection::vec(arb_domain_advertisement(), 0..5),
265+
)
266+
.prop_map(|(epoch, subnets, domains)| {
267+
ControlMessage::RouteAdvertise {
268+
protocol_version: CURRENT_PROTOCOL_VERSION,
269+
epoch,
270+
subnets,
271+
domains,
272+
}
273+
}),
274+
(any::<u64>(), any::<u32>()).prop_map(|(timestamp_ms, active_stream_count)| {
275+
ControlMessage::Heartbeat {
276+
protocol_version: CURRENT_PROTOCOL_VERSION,
277+
timestamp_ms,
278+
active_stream_count,
279+
}
280+
}),
281+
any::<u64>().prop_map(|timestamp_ms| ControlMessage::HeartbeatAck {
282+
protocol_version: CURRENT_PROTOCOL_VERSION,
283+
timestamp_ms,
284+
}),
285+
]
286+
}
287+
288+
proptest! {
289+
#[test]
290+
fn control_message_roundtrip(msg in arb_control_message()) {
291+
let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().expect("tokio runtime");
292+
rt.block_on(async {
293+
let mut buf = Vec::new();
294+
msg.encode(&mut buf).await.expect("encode should succeed");
295+
let decoded = ControlMessage::decode(&mut buf.as_slice()).await.expect("decode should succeed");
296+
prop_assert_eq!(msg, decoded);
297+
Ok(())
298+
})?;
299+
}
300+
}
301+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
/// Protocol-level errors for the agent tunnel.
2+
#[derive(Debug, thiserror::Error)]
3+
pub enum ProtoError {
4+
#[error("unsupported protocol version {received} (supported: {min}..={max})")]
5+
UnsupportedVersion { received: u16, min: u16, max: u16 },
6+
7+
#[error("message too large: {size} bytes (max: {max})")]
8+
MessageTooLarge { size: u32, max: u32 },
9+
10+
#[error("bincode encode/decode error: {0}")]
11+
Bincode(#[from] bincode::Error),
12+
13+
#[error("I/O error: {0}")]
14+
Io(#[from] std::io::Error),
15+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
//! Protocol definitions for the QUIC-based agent tunnel.
2+
//!
3+
//! This crate defines the binary protocol exchanged between Gateway and Agent
4+
//! over QUIC streams. All messages use length-prefixed bincode encoding and
5+
//! carry a `protocol_version` field for forward compatibility.
6+
//!
7+
//! ## Stream model
8+
//!
9+
//! - **Control stream** (QUIC stream 0): carries [`ControlMessage`] variants
10+
//! (route advertisements, heartbeats).
11+
//! - **Session streams** (QUIC streams 1..N): each stream proxies one TCP
12+
//! connection. The first message is a [`ConnectMessage`] from Gateway,
13+
//! followed by a [`ConnectResponse`] from Agent. After a successful
14+
//! response, raw TCP bytes flow bidirectionally.
15+
16+
pub mod control;
17+
pub mod error;
18+
pub mod session;
19+
pub mod version;
20+
21+
pub use control::{ControlMessage, DomainAdvertisement, MAX_CONTROL_MESSAGE_SIZE};
22+
pub use error::ProtoError;
23+
pub use session::{ConnectMessage, ConnectResponse, MAX_SESSION_MESSAGE_SIZE};
24+
pub use version::{CURRENT_PROTOCOL_VERSION, MIN_SUPPORTED_VERSION, validate_protocol_version};

0 commit comments

Comments
 (0)