Skip to content

Commit a61aef3

Browse files
feat: QUIC agent tunnel — protocol, listener, agent client
Add QUIC-based agent tunnel core infrastructure. Agents in private networks connect outbound to Gateway via QUIC/mTLS, advertise reachable subnets and domains, and proxy TCP connections on behalf of Gateway. Protocol (agent-tunnel-proto crate): - RouteAdvertise with subnets + domain advertisements - ConnectMessage/ConnectResponse for session stream setup - Heartbeat/HeartbeatAck for liveness detection - Protocol version negotiation (v2) Gateway (agent_tunnel module): - QUIC listener with mTLS authentication - Agent registry with subnet/domain tracking - Certificate authority for agent enrollment - Enrollment token store (one-time tokens) - Bidirectional proxy stream multiplexing Agent (devolutions-agent): - QUIC client with auto-reconnect and exponential backoff - Agent enrollment with config merge (preserves existing settings) - Domain auto-detection (Windows: USERDNSDOMAIN, Linux: resolv.conf) - Subnet validation on incoming connections - Certificate file permissions (0o600 on Unix) API endpoints: - POST /jet/agent-tunnel/enroll — agent enrollment - GET /jet/agent-tunnel/agents — list agents - GET /jet/agent-tunnel/agents/{id} — get agent - DELETE /jet/agent-tunnel/agents/{id} — delete agent - POST /jet/agent-tunnel/agents/resolve-target — routing diagnostics Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent e4052b2 commit a61aef3

35 files changed

Lines changed: 4344 additions & 96 deletions

Cargo.lock

Lines changed: 306 additions & 87 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
[package]
2+
name = "agent-tunnel-proto"
3+
version = "0.0.0"
4+
authors = ["Devolutions Inc. <infos@devolutions.net>"]
5+
edition = "2024"
6+
publish = false
7+
8+
[lints]
9+
workspace = true
10+
11+
[dependencies]
12+
bincode = "1.3"
13+
ipnetwork = "0.20"
14+
serde = { version = "1", features = ["derive"] }
15+
thiserror = "2.0"
16+
tokio = { version = "1.45", features = ["io-util"] }
17+
uuid = { version = "1.17", features = ["v4", "serde"] }
18+
19+
[dev-dependencies]
20+
proptest = "1.7"
21+
tokio = { version = "1.45", features = ["rt", "macros"] }
Lines changed: 308 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,308 @@
1+
use ipnetwork::Ipv4Network;
2+
use serde::{Deserialize, Serialize};
3+
use tokio::io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _};
4+
5+
use crate::error::ProtoError;
6+
use crate::version::CURRENT_PROTOCOL_VERSION;
7+
8+
/// Maximum encoded message size (1 MiB) to prevent denial-of-service via oversized frames.
9+
pub const MAX_CONTROL_MESSAGE_SIZE: u32 = 1024 * 1024;
10+
11+
/// A DNS domain advertisement with its source.
12+
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
13+
pub struct DomainAdvertisement {
14+
/// The DNS domain (e.g., "contoso.local").
15+
pub domain: String,
16+
/// Whether this domain was auto-detected (`true`) or explicitly configured (`false`).
17+
pub auto_detected: bool,
18+
}
19+
20+
/// Control-plane messages exchanged over the dedicated control stream (stream ID 0).
21+
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
22+
pub enum ControlMessage {
23+
/// Agent advertises subnets and domains it can reach.
24+
RouteAdvertise {
25+
protocol_version: u16,
26+
/// Monotonically increasing epoch within this agent process lifetime.
27+
epoch: u64,
28+
/// Reachable IPv4 subnets.
29+
subnets: Vec<Ipv4Network>,
30+
/// DNS domains this agent can resolve, with source tracking.
31+
domains: Vec<DomainAdvertisement>,
32+
},
33+
34+
/// Periodic liveness probe.
35+
Heartbeat {
36+
protocol_version: u16,
37+
/// Milliseconds since UNIX epoch (sender's wall clock).
38+
timestamp_ms: u64,
39+
/// Number of currently active proxy streams on this connection.
40+
active_stream_count: u32,
41+
},
42+
43+
/// Acknowledgement to a Heartbeat.
44+
HeartbeatAck {
45+
protocol_version: u16,
46+
/// Echoed timestamp from the corresponding Heartbeat.
47+
timestamp_ms: u64,
48+
},
49+
}
50+
51+
impl ControlMessage {
52+
/// Create a new RouteAdvertise with the current protocol version.
53+
pub fn route_advertise(epoch: u64, subnets: Vec<Ipv4Network>, domains: Vec<DomainAdvertisement>) -> Self {
54+
Self::RouteAdvertise {
55+
protocol_version: CURRENT_PROTOCOL_VERSION,
56+
epoch,
57+
subnets,
58+
domains,
59+
}
60+
}
61+
62+
/// Create a new Heartbeat with the current protocol version.
63+
pub fn heartbeat(timestamp_ms: u64, active_stream_count: u32) -> Self {
64+
Self::Heartbeat {
65+
protocol_version: CURRENT_PROTOCOL_VERSION,
66+
timestamp_ms,
67+
active_stream_count,
68+
}
69+
}
70+
71+
/// Create a new HeartbeatAck with the current protocol version.
72+
pub fn heartbeat_ack(timestamp_ms: u64) -> Self {
73+
Self::HeartbeatAck {
74+
protocol_version: CURRENT_PROTOCOL_VERSION,
75+
timestamp_ms,
76+
}
77+
}
78+
79+
/// Length-prefixed bincode encode and write to an async writer.
80+
pub async fn encode<W: AsyncWrite + Unpin>(&self, writer: &mut W) -> Result<(), ProtoError> {
81+
let payload = bincode::serialize(self)?;
82+
let len = u32::try_from(payload.len()).map_err(|_| ProtoError::MessageTooLarge {
83+
size: u32::MAX,
84+
max: MAX_CONTROL_MESSAGE_SIZE,
85+
})?;
86+
if MAX_CONTROL_MESSAGE_SIZE < len {
87+
return Err(ProtoError::MessageTooLarge {
88+
size: len,
89+
max: MAX_CONTROL_MESSAGE_SIZE,
90+
});
91+
}
92+
writer.write_all(&len.to_be_bytes()).await?;
93+
writer.write_all(&payload).await?;
94+
writer.flush().await?;
95+
Ok(())
96+
}
97+
98+
/// Read and decode a length-prefixed bincode message from an async reader.
99+
pub async fn decode<R: AsyncRead + Unpin>(reader: &mut R) -> Result<Self, ProtoError> {
100+
let mut len_buf = [0u8; 4];
101+
reader.read_exact(&mut len_buf).await?;
102+
let len = u32::from_be_bytes(len_buf);
103+
104+
if MAX_CONTROL_MESSAGE_SIZE < len {
105+
return Err(ProtoError::MessageTooLarge {
106+
size: len,
107+
max: MAX_CONTROL_MESSAGE_SIZE,
108+
});
109+
}
110+
111+
let mut payload = vec![0u8; len as usize];
112+
reader.read_exact(&mut payload).await?;
113+
let msg: Self = bincode::deserialize(&payload)?;
114+
Ok(msg)
115+
}
116+
117+
/// Extract the protocol version from any variant.
118+
pub fn protocol_version(&self) -> u16 {
119+
match self {
120+
Self::RouteAdvertise { protocol_version, .. }
121+
| Self::Heartbeat { protocol_version, .. }
122+
| Self::HeartbeatAck { protocol_version, .. } => *protocol_version,
123+
}
124+
}
125+
}
126+
127+
#[cfg(test)]
128+
mod tests {
129+
use super::*;
130+
131+
#[tokio::test]
132+
async fn roundtrip_route_advertise() {
133+
let msg = ControlMessage::route_advertise(
134+
42,
135+
vec![
136+
"10.0.0.0/8".parse().expect("valid CIDR"),
137+
"192.168.1.0/24".parse().expect("valid CIDR"),
138+
],
139+
vec![],
140+
);
141+
142+
let mut buf = Vec::new();
143+
msg.encode(&mut buf).await.expect("encode should succeed");
144+
145+
let decoded = ControlMessage::decode(&mut buf.as_slice())
146+
.await
147+
.expect("decode should succeed");
148+
149+
assert_eq!(msg, decoded);
150+
}
151+
152+
#[tokio::test]
153+
async fn roundtrip_route_advertise_with_domains() {
154+
let msg = ControlMessage::route_advertise(
155+
42,
156+
vec!["10.0.0.0/8".parse().expect("valid CIDR")],
157+
vec![
158+
DomainAdvertisement {
159+
domain: "contoso.local".to_owned(),
160+
auto_detected: false,
161+
},
162+
DomainAdvertisement {
163+
domain: "finance.contoso.local".to_owned(),
164+
auto_detected: true,
165+
},
166+
],
167+
);
168+
169+
let mut buf = Vec::new();
170+
msg.encode(&mut buf).await.expect("encode should succeed");
171+
172+
let decoded = ControlMessage::decode(&mut buf.as_slice())
173+
.await
174+
.expect("decode should succeed");
175+
176+
assert_eq!(msg, decoded);
177+
178+
match &decoded {
179+
ControlMessage::RouteAdvertise { domains, .. } => {
180+
assert_eq!(domains.len(), 2);
181+
assert_eq!(domains[0].domain, "contoso.local");
182+
assert!(!domains[0].auto_detected);
183+
assert_eq!(domains[1].domain, "finance.contoso.local");
184+
assert!(domains[1].auto_detected);
185+
}
186+
_ => panic!("expected RouteAdvertise"),
187+
}
188+
}
189+
190+
#[tokio::test]
191+
async fn roundtrip_route_advertise_empty_domains() {
192+
let msg = ControlMessage::route_advertise(1, vec!["192.168.1.0/24".parse().expect("valid CIDR")], vec![]);
193+
194+
let mut buf = Vec::new();
195+
msg.encode(&mut buf).await.expect("encode should succeed");
196+
197+
let decoded = ControlMessage::decode(&mut buf.as_slice())
198+
.await
199+
.expect("decode should succeed");
200+
201+
assert_eq!(msg, decoded);
202+
}
203+
204+
#[tokio::test]
205+
async fn roundtrip_heartbeat() {
206+
let msg = ControlMessage::heartbeat(1_700_000_000_000, 5);
207+
208+
let mut buf = Vec::new();
209+
msg.encode(&mut buf).await.expect("encode should succeed");
210+
211+
let decoded = ControlMessage::decode(&mut buf.as_slice())
212+
.await
213+
.expect("decode should succeed");
214+
215+
assert_eq!(msg, decoded);
216+
}
217+
218+
#[tokio::test]
219+
async fn roundtrip_heartbeat_ack() {
220+
let msg = ControlMessage::heartbeat_ack(1_700_000_000_000);
221+
222+
let mut buf = Vec::new();
223+
msg.encode(&mut buf).await.expect("encode should succeed");
224+
225+
let decoded = ControlMessage::decode(&mut buf.as_slice())
226+
.await
227+
.expect("decode should succeed");
228+
229+
assert_eq!(msg, decoded);
230+
}
231+
232+
#[tokio::test]
233+
async fn reject_oversized_message() {
234+
// Craft a length prefix that exceeds the maximum
235+
let bad_len = (MAX_CONTROL_MESSAGE_SIZE + 1).to_be_bytes();
236+
let mut buf = bad_len.to_vec();
237+
buf.extend_from_slice(&[0u8; 32]); // dummy payload
238+
239+
let result = ControlMessage::decode(&mut buf.as_slice()).await;
240+
assert!(result.is_err());
241+
}
242+
}
243+
244+
#[cfg(test)]
245+
mod proptests {
246+
use proptest::prelude::*;
247+
248+
use super::*;
249+
use crate::version::CURRENT_PROTOCOL_VERSION;
250+
251+
fn arb_ipv4_network() -> impl Strategy<Value = Ipv4Network> {
252+
(any::<[u8; 4]>(), 0u8..=32).prop_map(|(octets, prefix)| {
253+
let ip = std::net::Ipv4Addr::from(octets);
254+
// Use network() to normalize the address for the given prefix
255+
Ipv4Network::new(ip, prefix)
256+
.map(|n| Ipv4Network::new(n.network(), prefix).expect("normalized network should be valid"))
257+
.unwrap_or_else(|_| Ipv4Network::new(std::net::Ipv4Addr::UNSPECIFIED, 0).expect("0.0.0.0/0 is valid"))
258+
})
259+
}
260+
261+
fn arb_domain_advertisement() -> impl Strategy<Value = DomainAdvertisement> {
262+
("[a-z]{3,10}\\.[a-z]{2,5}", any::<bool>())
263+
.prop_map(|(domain, auto_detected)| DomainAdvertisement { domain, auto_detected })
264+
}
265+
266+
fn arb_control_message() -> impl Strategy<Value = ControlMessage> {
267+
prop_oneof![
268+
(
269+
any::<u64>(),
270+
proptest::collection::vec(arb_ipv4_network(), 0..50),
271+
proptest::collection::vec(arb_domain_advertisement(), 0..5),
272+
)
273+
.prop_map(|(epoch, subnets, domains)| {
274+
ControlMessage::RouteAdvertise {
275+
protocol_version: CURRENT_PROTOCOL_VERSION,
276+
epoch,
277+
subnets,
278+
domains,
279+
}
280+
}),
281+
(any::<u64>(), any::<u32>()).prop_map(|(timestamp_ms, active_stream_count)| {
282+
ControlMessage::Heartbeat {
283+
protocol_version: CURRENT_PROTOCOL_VERSION,
284+
timestamp_ms,
285+
active_stream_count,
286+
}
287+
}),
288+
any::<u64>().prop_map(|timestamp_ms| ControlMessage::HeartbeatAck {
289+
protocol_version: CURRENT_PROTOCOL_VERSION,
290+
timestamp_ms,
291+
}),
292+
]
293+
}
294+
295+
proptest! {
296+
#[test]
297+
fn control_message_roundtrip(msg in arb_control_message()) {
298+
let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().expect("tokio runtime");
299+
rt.block_on(async {
300+
let mut buf = Vec::new();
301+
msg.encode(&mut buf).await.expect("encode should succeed");
302+
let decoded = ControlMessage::decode(&mut buf.as_slice()).await.expect("decode should succeed");
303+
prop_assert_eq!(msg, decoded);
304+
Ok(())
305+
})?;
306+
}
307+
}
308+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
/// Protocol-level errors for the agent tunnel.
2+
#[derive(Debug, thiserror::Error)]
3+
pub enum ProtoError {
4+
#[error("unsupported protocol version {received} (supported: {min}..={max})")]
5+
UnsupportedVersion { received: u16, min: u16, max: u16 },
6+
7+
#[error("message too large: {size} bytes (max: {max})")]
8+
MessageTooLarge { size: u32, max: u32 },
9+
10+
#[error("bincode encode/decode error: {0}")]
11+
Bincode(#[from] bincode::Error),
12+
13+
#[error("I/O error: {0}")]
14+
Io(#[from] std::io::Error),
15+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
//! Protocol definitions for the QUIC-based agent tunnel.
2+
//!
3+
//! This crate defines the binary protocol exchanged between Gateway and Agent
4+
//! over QUIC streams. All messages use length-prefixed bincode encoding and
5+
//! carry a `protocol_version` field for forward compatibility.
6+
//!
7+
//! ## Stream model
8+
//!
9+
//! - **Control stream** (QUIC stream 0): carries [`ControlMessage`] variants
10+
//! (route advertisements, heartbeats).
11+
//! - **Session streams** (QUIC streams 1..N): each stream proxies one TCP
12+
//! connection. The first message is a [`ConnectMessage`] from Gateway,
13+
//! followed by a [`ConnectResponse`] from Agent. After a successful
14+
//! response, raw TCP bytes flow bidirectionally.
15+
16+
pub mod control;
17+
pub mod error;
18+
pub mod session;
19+
pub mod version;
20+
21+
pub use control::{ControlMessage, DomainAdvertisement, MAX_CONTROL_MESSAGE_SIZE};
22+
pub use error::ProtoError;
23+
pub use session::{ConnectMessage, ConnectResponse, MAX_SESSION_MESSAGE_SIZE};
24+
pub use version::{CURRENT_PROTOCOL_VERSION, MIN_SUPPORTED_VERSION, validate_protocol_version};

0 commit comments

Comments
 (0)