diff --git a/Cargo.lock b/Cargo.lock index bf8456c..44f3808 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -16,6 +16,7 @@ dependencies = [ "rustc_version_runtime", "serde", "serde_json", + "tungstenite", "uuid", "windows", ] @@ -161,6 +162,15 @@ version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5c8214115b7bf84099f1309324e63141d4c5d7cc26862f97a0a857dbefe165bd" +[[package]] +name = "block-buffer" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71" +dependencies = [ + "generic-array", +] + [[package]] name = "block2" version = "0.6.2" @@ -219,7 +229,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6f8d983286843e49675a4b7a2d174efe136dc93a18d69130dd18198a6c167601" dependencies = [ "cfg-if", - "cpufeatures", + "cpufeatures 0.3.0", "rand_core 0.10.0", ] @@ -311,6 +321,15 @@ version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" +[[package]] +name = "cpufeatures" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59ed5838eebb26a2bb2e58f6d5b5316989ae9d08bab10e0e6d103e656d1b0280" +dependencies = [ + "libc", +] + [[package]] name = "cpufeatures" version = "0.3.0" @@ -320,6 +339,32 @@ dependencies = [ "libc", ] +[[package]] +name = "crypto-common" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78c8292055d1c1df0cce5d180393dc8cce0abec0a7102adb6c7b1eef6016d60a" +dependencies = [ + "generic-array", + "typenum", +] + +[[package]] +name = "data-encoding" +version = "2.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7a1e2f27636f116493b8b860f5546edb47c8d8f8ea73e1d2a20be88e28d1fea" + +[[package]] +name = "digest" +version = "0.10.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" +dependencies = [ + "block-buffer", + "crypto-common", +] + [[package]] name = "dirs-next" version = "1.0.2" @@ -476,6 +521,16 @@ dependencies = [ "slab", ] +[[package]] +name = "generic-array" +version = "0.14.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a" +dependencies = [ + "typenum", + "version_check", +] + [[package]] name = "getrandom" version = "0.2.16" @@ -1396,6 +1451,8 @@ dependencies = [ "rustls", "rustls-pki-types", "rustls-platform-verifier", + "serde", + "serde_json", "sync_wrapper", "tokio", "tokio-rustls", @@ -1545,11 +1602,11 @@ dependencies = [ [[package]] name = "schannel" -version = "0.1.27" +version = "0.1.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1f29ebaa345f945cec9fbbc532eb307f0fdad8161f281b6369539c8d84876b3d" +checksum = "891d81b926048e76efe18581bf793546b4c0eaf8448d72be8de2bbee5fd166e1" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -1624,6 +1681,17 @@ dependencies = [ "zmij", ] +[[package]] +name = "sha1" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" +dependencies = [ + "cfg-if", + "cpufeatures 0.2.17", + "digest", +] + [[package]] name = "shlex" version = "1.3.0" @@ -1898,6 +1966,32 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tungstenite" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8628dcc84e5a09eb3d8423d6cb682965dea9133204e8fb3efee74c2a0c259442" +dependencies = [ + "bytes", + "data-encoding", + "http", + "httparse", + "log", + "rand 0.9.1", + "rustls", + "rustls-pki-types", + "sha1", + "thiserror 2.0.18", + "utf-8", + "webpki-roots 0.26.11", +] + +[[package]] +name = "typenum" +version = "1.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "562d481066bde0658276a35467c4af00bdc6ee726305698a55b86e61d7ad82bb" + [[package]] name = "unicase" version = "2.8.1" @@ -1933,6 +2027,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + [[package]] name = "utf16_iter" version = "1.0.5" @@ -1963,6 +2063,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "version_check" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" + [[package]] name = "walkdir" version = "2.5.0" @@ -2149,6 +2255,24 @@ dependencies = [ "rustls-pki-types", ] +[[package]] +name = "webpki-roots" +version = "0.26.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "521bc38abb08001b01866da9f51eb7c5d647a19260e00054a8c7fd5f9e57f7a9" +dependencies = [ + "webpki-roots 1.0.6", +] + +[[package]] +name = "webpki-roots" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22cfaf3c063993ff62e73cb4311efde4db1efb31ab78a3e5c457939ad5cc0bed" +dependencies = [ + "rustls-pki-types", +] + [[package]] name = "winapi" version = "0.3.9" diff --git a/Cargo.toml b/Cargo.toml index dad7766..c022437 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,7 +28,9 @@ uuid = { version = "1.22.0", features = [ "macro-diagnostics", # Enable better diagnostics for compile-time UUIDs ] } -reqwest = { version = "0.13", features = ["blocking", "multipart"] } +reqwest = { version = "0.13", features = ["blocking", "multipart", "json"] } + +tungstenite = { version = "0.28", features = ["rustls-tls-webpki-roots"] } rustc_version_runtime = "0.3.0" os_info = "3" diff --git a/src/commands/api/asciinema.rs b/src/commands/api/asciinema.rs index 32d3f98..995e0e2 100644 --- a/src/commands/api/asciinema.rs +++ b/src/commands/api/asciinema.rs @@ -1,4 +1,4 @@ -use super::ApiService; +use super::{ApiService, StreamInfo}; use base64::Engine; use base64::prelude::BASE64_STANDARD; @@ -193,11 +193,73 @@ impl ApiService for Asciinema { None } } + + fn create_stream(&self, cols: u16, rows: u16) -> Option { + #[derive(Deserialize)] + struct CreateStreamResponse { + id: String, + url: String, + ws_producer_url: String, + } + + let stream_url = format!("{}/api/streams", &self.config.api_server); + let body = serde_json::json!({ "cols": cols, "rows": rows }); + let res = match self + .http_client + .post(stream_url) + .json(&body) + .send() + { + Ok(r) => r, + Err(e) => { + println!("Failed to reach stream server: {}", e); + return None; + } + }; + + if res.status().is_success() { + match res.json::() { + Ok(resp) => Some(StreamInfo { + id: resp.id, + url: resp.url, + ws_producer_url: resp.ws_producer_url, + }), + Err(e) => { + println!("Failed to parse stream response: {}", e); + None + } + } + } else { + println!("Failed to create stream:"); + println!("{}", res.text().unwrap_or_default()); + None + } + } + + fn get_stream_ws_url(&self, stream_id: &str) -> String { + // Derive the producer WebSocket URL from the server base URL and stream ID. + // The path follows the asciinema server convention: /ws/S/ + let base = self + .config + .api_server + .trim_end_matches('/') + .replace("https://", "wss://") + .replace("http://", "ws://"); + format!("{}/ws/S/{}", base, stream_id) + } + + fn get_auth_header(&self) -> String { + let cred = format!("user:{}", self.config.install_id); + format!("Basic {}", BASE64_STANDARD.encode(&cred)) + } } #[cfg(test)] mod tests { + use base64::prelude::BASE64_STANDARD; + use base64::Engine; use crate::commands::api::asciinema::Config; + use crate::commands::api::ApiService; use uuid::{Uuid, Version}; @@ -207,4 +269,50 @@ mod tests { let uuid = Uuid::parse_str(&c.install_id); assert_eq!(uuid.unwrap().get_version(), Some(Version::Random)); // uuid4 } + + #[test] + fn test_get_stream_ws_url_https_and_http() { + let client = reqwest::blocking::Client::new(); + + let asc_https = super::Asciinema { + config: Config { + install_id: "install".to_string(), + api_server: "https://demo.asciinema.org/".to_string(), + location: String::new(), + }, + http_client: client.clone(), + }; + assert_eq!( + asc_https.get_stream_ws_url("abc123"), + "wss://demo.asciinema.org/ws/S/abc123" + ); + + let asc_http = super::Asciinema { + config: Config { + install_id: "install".to_string(), + api_server: "http://asciinema.test".to_string(), + location: String::new(), + }, + http_client: client, + }; + assert_eq!( + asc_http.get_stream_ws_url("xyz"), + "ws://asciinema.test/ws/S/xyz" + ); + } + + #[test] + fn test_get_auth_header_format() { + let asc = super::Asciinema { + config: Config { + install_id: "token-123".to_string(), + api_server: "https://example".to_string(), + location: String::new(), + }, + http_client: reqwest::blocking::Client::new(), + }; + + let expected = format!("Basic {}", BASE64_STANDARD.encode("user:token-123")); + assert_eq!(asc.get_auth_header(), expected); + } } diff --git a/src/commands/api/mod.rs b/src/commands/api/mod.rs index f6beeb0..cc82848 100644 --- a/src/commands/api/mod.rs +++ b/src/commands/api/mod.rs @@ -1,8 +1,21 @@ mod asciinema; +pub struct StreamInfo { + pub id: String, + pub url: String, + pub ws_producer_url: String, +} + pub trait ApiService { fn auth(&self); fn upload(&self, filepath: &str) -> Option; + /// Create a new live stream on the server. Returns `StreamInfo` with the public viewer + /// URL and the WebSocket producer URL to push events to. + fn create_stream(&self, cols: u16, rows: u16) -> Option; + /// Build the WebSocket producer URL for an *existing* stream identified by `stream_id`. + fn get_stream_ws_url(&self, stream_id: &str) -> String; + /// Return the `Authorization` header value to authenticate WebSocket connections. + fn get_auth_header(&self) -> String; } pub use asciinema::Asciinema; diff --git a/src/commands/mod.rs b/src/commands/mod.rs index e715c3e..b7320a8 100644 --- a/src/commands/mod.rs +++ b/src/commands/mod.rs @@ -4,12 +4,14 @@ mod api; mod auth; mod play; mod record; +mod stream; mod types; mod upload; pub use auth::Auth; pub use play::Play; pub use record::Record; +pub use stream::Stream; pub use upload::Upload; -pub use api::Asciinema; +pub use api::{ApiService, Asciinema}; diff --git a/src/commands/stream.rs b/src/commands/stream.rs new file mode 100644 index 0000000..3d4c620 --- /dev/null +++ b/src/commands/stream.rs @@ -0,0 +1,263 @@ +use std::{ + env, + io::ErrorKind, + sync::{mpsc::channel, Arc, Mutex}, + thread, + time::{Duration, SystemTime}, +}; + +use log::{error, trace}; +use tungstenite::stream::MaybeTlsStream; +use tungstenite::Message; + +#[cfg(windows)] +use windows::Win32::{ + Foundation::HANDLE, + Storage::FileSystem::ReadFile, + System::Console::{GetStdHandle, WriteConsoleW, STD_INPUT_HANDLE, STD_OUTPUT_HANDLE}, +}; + +use crate::commands::types::LineItem; +use crate::terminal::{Terminal, WindowsTerminal}; + +pub struct Stream { + ws_url: String, + stream_url: String, + auth_header: String, + command: String, + #[cfg(windows)] + terminal: WindowsTerminal, +} + +impl Stream { + pub fn new( + ws_url: String, + stream_url: String, + auth_header: String, + command: Option, + ) -> Self { + Stream { + ws_url, + stream_url, + auth_header, + command: command + .unwrap_or_else(|| env::var("SHELL").unwrap_or("powershell.exe".to_owned())), + terminal: WindowsTerminal::new(None), + } + } + + pub fn execute(&mut self) { + println!("Streaming. Watch at: {}", self.stream_url); + println!("Exit the shell/command to stop streaming."); + self.stream(); + } + + fn stream(&mut self) { + let now = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .expect("check your machine time"); + let record_start_time = now.as_secs() as f64 + now.subsec_nanos() as f64 * 1e-9; + + // Build the WebSocket connection request, including the Authorization header so + // both freshly-created streams and reconnects to existing streams authenticate. + let request = tungstenite::http::Request::builder() + .uri(&self.ws_url) + .header("Authorization", &self.auth_header) + .body(()) + .expect("failed to build WebSocket request"); + + let (mut ws, _) = + tungstenite::connect(request).expect("failed to connect to stream server"); + match ws.get_mut() { + MaybeTlsStream::Plain(stream) => { + stream + .set_nonblocking(true) + .unwrap_or_else(|e| trace!("failed to set non-blocking websocket: {}", e)); + } + MaybeTlsStream::Rustls(stream) => { + stream + .get_mut() + .set_nonblocking(true) + .unwrap_or_else(|e| trace!("failed to set non-blocking websocket: {}", e)); + } + _ => {} + } + let ws = Arc::new(Mutex::new(ws)); + + { + // Send an asciicast-compatible reset event so the server knows the terminal size. + let reset_data = format!("{}x{}", self.terminal.width, self.terminal.height); + let reset_event = serde_json::to_string(&[ + LineItem::F64(0.0), + LineItem::String("r".to_string()), + LineItem::String(reset_data), + ]) + .unwrap(); + let mut sock = ws.lock().expect("websocket mutex poisoned"); + sock.send(Message::Text(reset_event.into())) + .expect("failed to send reset event"); + } + + // Keep a read loop alive to respond to Ping/Pong/Close frames from the server. + let ws_reader = ws.clone(); + thread::spawn(move || loop { + let msg = { + let mut sock = ws_reader.lock().expect("websocket mutex poisoned"); + sock.read() + }; + + match msg { + Ok(Message::Ping(payload)) => { + if let Ok(mut sock) = ws_reader.lock() { + let _ = sock.send(Message::Pong(payload)); + } + } + Ok(Message::Close(frame)) => { + trace!("server closed stream: {:?}", frame); + break; + } + Ok(_) => {} + Err(tungstenite::Error::Io(ref e)) + if e.kind() == ErrorKind::WouldBlock || e.kind() == ErrorKind::TimedOut => + { + thread::sleep(Duration::from_millis(25)); + } + Err(tungstenite::Error::AlreadyClosed | tungstenite::Error::ConnectionClosed) => { + break; + } + Err(e) => { + error!("websocket read error: {}", e); + break; + } + } + }); + + let (stdin_tx, stdin_rx) = channel::<(Vec, usize)>(); + let (stdout_tx, stdout_rx) = channel::<(Vec, usize)>(); + + // On Windows, use ReadFile directly to preserve ESC sequences (same as record.rs). + #[cfg(windows)] + let stdin_handle: isize = unsafe { + GetStdHandle(STD_INPUT_HANDLE) + .expect("failed to get Windows stdin handle (STD_INPUT_HANDLE)") + .0 as isize + }; + + thread::spawn(move || loop { + let mut buf = [0u8; 10]; + + #[cfg(windows)] + let n = { + let mut n_read: u32 = 0; + let ok = unsafe { + ReadFile( + HANDLE(stdin_handle as _), + Some(&mut buf), + Some(&mut n_read), + None, + ) + .is_ok() + }; + if !ok { + panic!("ReadFile on stdin failed"); + } + if n_read == 0 { + panic!("pty stdin closed"); + } + n_read as usize + }; + + #[cfg(not(windows))] + let n = { + use std::io::Read; + match std::io::stdin().lock().read(&mut buf) { + Ok(n) if n > 0 => n, + _ => panic!("pty stdin closed"), + } + }; + + stdin_tx.send((buf.to_vec(), n)).unwrap(); + }); + + // Stdout thread: read pty output, display it locally, and forward it to the WebSocket. + let ws_writer = ws.clone(); + thread::spawn(move || { + #[cfg(windows)] + let stdout_handle: HANDLE = unsafe { + GetStdHandle(STD_OUTPUT_HANDLE).expect("failed to get stdout handle") + }; + + let mut pending_bytes: Vec = Vec::new(); + + loop { + let rv = stdout_rx.recv(); + match rv { + Ok((buf, len)) => { + if len == 0 { + trace!("stdout received close indicator"); + println!("\nStreaming session ended."); + if let Ok(mut sock) = ws_writer.lock() { + let _ = sock.close(None); + } + break; + } + + let now = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .expect("check your machine time"); + let ts = now.as_secs() as f64 + now.subsec_nanos() as f64 * 1e-9 + - record_start_time; + + pending_bytes.extend_from_slice(&buf[..len]); + + let valid_up_to = match std::str::from_utf8(&pending_bytes) { + Ok(_) => pending_bytes.len(), + Err(e) => e.valid_up_to(), + }; + + if valid_up_to > 0 { + let chars = + std::str::from_utf8(&pending_bytes[..valid_up_to]).unwrap(); + + let data = vec![ + LineItem::F64(ts), + LineItem::String("o".to_string()), + LineItem::String(chars.to_string()), + ]; + let event = serde_json::to_string(&data).unwrap(); + + let send_result = ws_writer + .lock() + .expect("websocket mutex poisoned") + .send(Message::Text(event.into())); + + if let Err(e) = send_result { + error!("failed to send WebSocket message: {}", e); + break; + } + + // Echo output to the local console as well. + #[cfg(windows)] + unsafe { + let utf16: Vec = chars.encode_utf16().collect(); + WriteConsoleW(stdout_handle, &utf16, None, None) + .expect("failed to write stdout"); + } + } + + pending_bytes.drain(..valid_up_to); + } + + Err(err) => { + error!("reading stdout: {}", err.to_string()); + break; + } + } + } + }); + + self.terminal.attach_stdin(stdin_rx); + self.terminal.attach_stdout(stdout_tx); + self.terminal.run(&self.command).unwrap(); + } +} diff --git a/src/main.rs b/src/main.rs index bfa971f..2a59e69 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,8 +7,8 @@ mod terminal; use clap::builder::styling::{AnsiColor, Styles}; use clap::{Arg, Command, crate_version}; -use commands::{Asciinema, Auth, Play}; -use commands::{Record, Upload}; +use commands::{Asciinema, Auth, Play, Stream}; +use commands::{ApiService, Record, Upload}; use fern::colors::ColoredLevelConfig; use log::trace; @@ -130,6 +130,23 @@ fn main() { .required(true), ), ) + .subcommand( + Command::new("stream") + .about("Stream a live terminal session to the asciinema server") + .arg( + Arg::new("command") + .help("The command to stream, defaults to $SHELL") + .num_args(1) + .short('c') + .long("command"), + ) + .arg( + Arg::new("id") + .help("Reconnect to an existing stream by its ID") + .num_args(1) + .long("id"), + ), + ) .arg( Arg::new("log-level") .help("can be one of [error|warn|info|debug|trace]") @@ -199,6 +216,49 @@ fn main() { Err(_) => println!("Error: not a correct URL - e.g: https://asciinema.org"), } } + Some(("stream", stream_matches)) => { + let api_service = Asciinema::new(); + let command = stream_matches + .get_one::("command") + .map(Into::into); + let auth_header = api_service.get_auth_header(); + + let (ws_url, stream_url) = + if let Some(id) = stream_matches.get_one::("id") { + // Reconnect to an existing stream using its ID. + let ws = api_service.get_stream_ws_url(id); + // Derive the viewer URL: wss://host/ws/S/ -> https://host/s/ + let viewer = ws + .replace("wss://", "https://") + .replace("ws://", "http://") + .replace("/ws/S/", "/s/"); + (ws, viewer) + } else { + // Create a fresh stream on the server. + // Read the current terminal size to inform the server. + #[cfg(windows)] + let (cols, rows) = { + crate::terminal::WindowsTerminal::console_size() + .unwrap_or((80u16, 24u16)) + }; + #[cfg(not(windows))] + let (cols, rows) = (80u16, 24u16); + + match api_service.create_stream(cols, rows) { + Some(info) => (info.ws_producer_url, info.url), + None => { + eprintln!( + "Failed to create stream. \ + Is the server reachable and are you authenticated?" + ); + std::process::exit(1); + } + } + }; + + let mut stream = Stream::new(ws_url, stream_url, auth_header, command); + stream.execute(); + } _ => unreachable!(), } } diff --git a/src/terminal/impl_win/terminal.rs b/src/terminal/impl_win/terminal.rs index 8040c1c..194ccc4 100644 --- a/src/terminal/impl_win/terminal.rs +++ b/src/terminal/impl_win/terminal.rs @@ -61,6 +61,10 @@ impl WindowsTerminal { } } + pub fn console_size() -> Option<(u16, u16)> { + unsafe { WindowsTerminal::get_console_size().ok().map(|(x, y)| (x as u16, y as u16)) } + } + fn create_pseudo_console_and_pipes( handle: &mut HPCON, stdin: &mut HANDLE, // the stdin to write input to PTY