Skip to content

Commit 7edce9c

Browse files
committed
wip: use roniker for LSP
1 parent 4b0c716 commit 7edce9c

13 files changed

Lines changed: 685 additions & 30 deletions

File tree

Cargo.lock

Lines changed: 361 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,9 @@ rand = "0.9.0"
5454
regex = "1.11.1"
5555
ron = "0.12.0"
5656
russh = { version = "0.52.0" }
57-
rustls = { version = "0.23.36", default-features = false, features = ["aws_lc_rs"] }
57+
rustls = { version = "0.23.36", default-features = false, features = [
58+
"aws_lc_rs",
59+
] }
5860
serde_bytes = "0.11.15"
5961
serde_cbor = "0.11.2"
6062
serde_json = "1.0.134"
@@ -68,6 +70,7 @@ tokio-stream = "0.1.17"
6870
tokio-util = "0.7.15"
6971
tokio = { version = "1.49.0", features = ["full"] }
7072
tower = "0.5.2"
73+
roniker = "0.1.0"
7174
tower-http = { version = "0.6.1", features = ["trace"] }
7275
tracing = "0.1.40"
7376
url = { version = "2.5.4", features = ["serde"] }

sandpolis-filesystem/src/session.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ struct FsSessionStreamResponder {
9595
watcher: notify::RecommendedWatcher,
9696
}
9797

98+
#[cfg(feature = "agent")]
9899
impl StreamResponder for FsSessionStreamResponder {
99100
type In = FsSessionRequest;
100101
type Out = FsSessionResponse;

sandpolis-probe/Cargo.toml

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,19 @@ version = "0.0.1"
99
anyhow = { workspace = true }
1010
serde = { workspace = true }
1111
macaddr = { workspace = true }
12+
tokio = { workspace = true }
13+
tracing = { workspace = true }
14+
url = { workspace = true }
15+
futures = { workspace = true }
1216
rups = { version = "0.6", optional = true }
1317
vnc = { version = "0.4", optional = true }
1418
russh = { workspace = true, optional = true }
1519
ipmi-rs = { version = "0.4.0", optional = true }
1620
rasn-snmp = { version = "0.26.6", optional = true }
1721
arp-toolkit = { version = "0.3.2", optional = true }
22+
retina = { version = "0.4", optional = true }
1823
sandpolis-instance = { path = "../sandpolis-instance", version = "0.0.1" }
24+
sandpolis-macros = { path = "../sandpolis-macros", version = "0.0.1" }
1925

2026
# GUI dependencies
2127
bevy = { workspace = true, optional = true }
@@ -26,7 +32,8 @@ inventory = { workspace = true, optional = true }
2632
sandpolis-client = { path = "../sandpolis-client", version = "0.0.1", optional = true }
2733

2834
[features]
29-
agent = ["dep:rups", "dep:vnc", "dep:russh"]
35+
agent = ["dep:rups", "dep:vnc", "dep:russh", "dep:retina"]
36+
server = ["dep:rups", "dep:vnc", "dep:russh", "dep:retina"]
3037
client-gui = [
3138
"dep:bevy",
3239
"dep:bevy_egui",

sandpolis-probe/src/lib.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
77
use sandpolis_instance::InstanceId;
88
use serde::{Deserialize, Serialize};
9+
use std::collections::HashMap;
10+
use std::sync::{Arc, RwLock};
911

1012
pub mod config;
1113
pub mod docker;
@@ -24,6 +26,11 @@ pub mod wol;
2426
#[cfg(feature = "client-gui")]
2527
pub mod client;
2628

29+
/// The probe layer manages probe registrations and streaming state.
30+
#[derive(Clone)]
31+
#[cfg_attr(feature = "client-gui", derive(bevy::prelude::Resource))]
32+
pub struct ProbeLayer {}
33+
2734
/// An enumeration of all available probe types.
2835
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
2936
pub enum ProbeType {

sandpolis-probe/src/rtsp.rs

Lines changed: 0 additions & 10 deletions
This file was deleted.

sandpolis-probe/src/rtsp/mod.rs

Lines changed: 241 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,241 @@
1+
use anyhow::Result;
2+
use retina::client::{SessionGroup, SetupOptions};
3+
use retina::codec::CodecItem;
4+
use sandpolis_instance::network::StreamResponder;
5+
use sandpolis_macros::Stream;
6+
use serde::{Deserialize, Serialize};
7+
use std::sync::Arc;
8+
use tokio::sync::RwLock;
9+
use tokio::sync::mpsc::Sender;
10+
use tracing::debug;
11+
use url::Url;
12+
13+
#[derive(Serialize, Deserialize, Debug, Clone)]
14+
pub struct RtspConfig {
15+
pub port: u16,
16+
pub username: String,
17+
pub password: String,
18+
pub path: String,
19+
}
20+
21+
/// Request message for RTSP stream sessions.
22+
#[derive(Serialize, Deserialize)]
23+
pub enum RtspSessionStreamRequest {
24+
/// Start streaming from the given RTSP URL
25+
Start {
26+
/// Full RTSP URL (e.g., rtsp://user:pass@host:554/stream)
27+
url: String,
28+
29+
/// Transport protocol preference
30+
transport: RtspTransport,
31+
},
32+
/// Stop the stream
33+
Stop,
34+
}
35+
36+
/// Transport protocol for RTSP streaming.
37+
#[derive(Serialize, Deserialize, Clone, Copy, Debug, Default)]
38+
pub enum RtspTransport {
39+
/// UDP transport (lower latency, may have packet loss)
40+
Udp,
41+
/// TCP interleaved transport (more reliable)
42+
#[default]
43+
Tcp,
44+
}
45+
46+
/// Response message containing video/audio frame data.
47+
#[derive(Serialize, Deserialize)]
48+
pub struct RtspSessionStreamResponse {
49+
/// The stream index (0 for video, 1 for audio typically)
50+
pub stream_index: usize,
51+
52+
/// Frame data
53+
pub frame: RtspFrame,
54+
}
55+
56+
/// A single frame from the RTSP stream.
57+
#[derive(Serialize, Deserialize)]
58+
pub enum RtspFrame {
59+
/// H.264 video frame
60+
H264 {
61+
/// NAL units
62+
data: Vec<Vec<u8>>,
63+
/// Presentation timestamp in 90kHz units
64+
timestamp: i64,
65+
/// Whether this is a keyframe (IDR)
66+
is_keyframe: bool,
67+
},
68+
/// H.265/HEVC video frame
69+
H265 {
70+
/// NAL units
71+
data: Vec<Vec<u8>>,
72+
/// Presentation timestamp in 90kHz units
73+
timestamp: i64,
74+
/// Whether this is a keyframe
75+
is_keyframe: bool,
76+
},
77+
/// AAC audio frame
78+
Aac {
79+
/// Raw AAC data
80+
data: Vec<u8>,
81+
/// Presentation timestamp
82+
timestamp: i64,
83+
},
84+
/// G.711 audio frame
85+
G711 {
86+
/// Raw audio samples
87+
data: Vec<u8>,
88+
/// Presentation timestamp
89+
timestamp: i64,
90+
},
91+
/// Stream ended or error occurred
92+
End { reason: String },
93+
}
94+
95+
/// Stream responder that connects to an RTSP source and forwards frames.
96+
#[cfg(any(feature = "agent", feature = "server"))]
97+
#[derive(Stream, Default)]
98+
pub struct RtspSessionStreamResponder {
99+
/// Flag to signal the stream should stop
100+
stop_flag: Arc<RwLock<bool>>,
101+
}
102+
103+
#[cfg(any(feature = "agent", feature = "server"))]
104+
impl StreamResponder for RtspSessionStreamResponder {
105+
type In = RtspSessionStreamRequest;
106+
type Out = RtspSessionStreamResponse;
107+
108+
async fn on_message(&self, request: Self::In, sender: Sender<Self::Out>) -> Result<()> {
109+
match request {
110+
RtspSessionStreamRequest::Start { url, transport } => {
111+
// Reset stop flag
112+
*self.stop_flag.write().await = false;
113+
114+
// Parse the URL
115+
let parsed_url = Url::parse(&url)?;
116+
debug!(
117+
"Connecting to RTSP stream: {}",
118+
parsed_url.host_str().unwrap_or("unknown")
119+
);
120+
121+
// Create session options based on transport preference
122+
let session_group = Arc::new(SessionGroup::default());
123+
let mut session = retina::client::Session::describe(
124+
parsed_url,
125+
retina::client::SessionOptions::default().session_group(session_group),
126+
)
127+
.await?;
128+
129+
// Setup all streams
130+
for i in 0..session.streams().len() {
131+
let setup_options = match transport {
132+
RtspTransport::Udp => SetupOptions::default()
133+
.transport(retina::client::Transport::Udp(Default::default())),
134+
RtspTransport::Tcp => SetupOptions::default()
135+
.transport(retina::client::Transport::Tcp(Default::default())),
136+
};
137+
session.setup(i, setup_options).await?;
138+
}
139+
140+
// Start playing
141+
let mut session = session
142+
.play(retina::client::PlayOptions::default())
143+
.await?
144+
.demuxed()?;
145+
146+
let stop_flag = self.stop_flag.clone();
147+
148+
// Read frames in a loop
149+
loop {
150+
// Check stop flag
151+
if *stop_flag.read().await {
152+
let _ = sender
153+
.send(RtspSessionStreamResponse {
154+
stream_index: 0,
155+
frame: RtspFrame::End {
156+
reason: "Stopped by request".to_string(),
157+
},
158+
})
159+
.await;
160+
break;
161+
}
162+
163+
use futures::StreamExt;
164+
match session.next().await {
165+
Some(Ok(item)) => {
166+
let response = match item {
167+
CodecItem::VideoFrame(frame) => {
168+
let stream_id = frame.stream_id();
169+
let is_keyframe = frame.is_random_access_point();
170+
let timestamp = frame.timestamp().timestamp();
171+
let data = frame.into_data();
172+
173+
RtspSessionStreamResponse {
174+
stream_index: stream_id,
175+
frame: RtspFrame::H264 {
176+
data: vec![data],
177+
timestamp,
178+
is_keyframe,
179+
},
180+
}
181+
}
182+
CodecItem::AudioFrame(frame) => {
183+
let timestamp = frame.timestamp().timestamp();
184+
let data = frame.data().to_vec();
185+
186+
RtspSessionStreamResponse {
187+
stream_index: frame.stream_id(),
188+
frame: RtspFrame::Aac { data, timestamp },
189+
}
190+
}
191+
CodecItem::MessageFrame(_) => continue,
192+
_ => continue,
193+
};
194+
195+
if sender.send(response).await.is_err() {
196+
break;
197+
}
198+
}
199+
Some(Err(e)) => {
200+
let _ = sender
201+
.send(RtspSessionStreamResponse {
202+
stream_index: 0,
203+
frame: RtspFrame::End {
204+
reason: e.to_string(),
205+
},
206+
})
207+
.await;
208+
break;
209+
}
210+
None => {
211+
let _ = sender
212+
.send(RtspSessionStreamResponse {
213+
stream_index: 0,
214+
frame: RtspFrame::End {
215+
reason: "Stream ended".to_string(),
216+
},
217+
})
218+
.await;
219+
break;
220+
}
221+
}
222+
}
223+
}
224+
RtspSessionStreamRequest::Stop => {
225+
*self.stop_flag.write().await = true;
226+
}
227+
}
228+
Ok(())
229+
}
230+
}
231+
232+
#[cfg(any(feature = "agent", feature = "server"))]
233+
impl Drop for RtspSessionStreamResponder {
234+
fn drop(&mut self) {
235+
debug!("RTSP session responder dropped");
236+
// Signal stop in case the stream is still running
237+
if let Ok(mut flag) = self.stop_flag.try_write() {
238+
*flag = true;
239+
}
240+
}
241+
}

sandpolis/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@ version = "8.0.0"
1010

1111
[build-dependencies]
1212
built = { version = "0.7", features = ["git2", "chrono", "semver"] }
13+
roniker = { workspace = true, features = ["analyze"] }
14+
serde_json = { workspace = true }
15+
walkdir = "2"
1316

1417
[dependencies]
1518
anyhow = { workspace = true }
@@ -45,6 +48,7 @@ tempfile = { workspace = true }
4548
tokio-stream = { workspace = true }
4649
tokio = { workspace = true }
4750
tower-http = { workspace = true }
51+
roniker = { workspace = true, optional = true, features = ["lsp"] }
4852
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
4953
tracing = { workspace = true }
5054
validator = { workspace = true }
@@ -98,6 +102,7 @@ agent = [
98102
"sandpolis-shell?/agent",
99103
]
100104
client = [
105+
"dep:roniker",
101106
"sandpolis-client/client",
102107
"sandpolis-instance/client",
103108
"sandpolis-server/client",

sandpolis/build.rs

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,32 @@
1-
use std::{env, path::PathBuf};
1+
use std::{env, fs, path::PathBuf};
22

33
fn main() {
4+
let out_dir = env::var("OUT_DIR").expect("OUT_DIR not set");
5+
6+
// Generate built.rs
47
if built::write_built_file().is_err() {
5-
let dest =
6-
std::path::Path::new(&env::var("OUT_DIR").expect("OUT_DIR not set")).join("built.rs");
8+
let dest = std::path::Path::new(&out_dir).join("built.rs");
79
built::write_built_file_with_opts(Some(&PathBuf::from("..")), &dest)
810
.expect("Failed to acquire build-time information");
911
}
12+
13+
// Generate rust_analyzer.json for the LSP
14+
let workspace_root = PathBuf::from(env::var("CARGO_MANIFEST_DIR").unwrap()).join("..");
15+
let mut analyzer = roniker::RustAnalyzer::new("crate::config::Configuration");
16+
17+
// Find and analyze all config.rs files in the workspace
18+
for entry in walkdir::WalkDir::new(&workspace_root)
19+
.into_iter()
20+
.filter_map(|e| e.ok())
21+
.filter(|e| e.file_type().is_file())
22+
.filter(|e| e.file_name() == "config.rs")
23+
{
24+
let _ = analyzer.add_file(entry.path());
25+
}
26+
27+
let json = serde_json::to_string(&analyzer).expect("Failed to serialize RustAnalyzer");
28+
let dest = PathBuf::from(&out_dir).join("rust_analyzer.json");
29+
fs::write(&dest, json).expect("Failed to write rust_analyzer.json");
30+
31+
println!("cargo:rerun-if-changed=../");
1032
}

0 commit comments

Comments
 (0)