From 4e1acf1442510b4b5b909936ba9ec603764b1759 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 10 Mar 2026 02:12:19 +0000 Subject: [PATCH 1/6] Initial plan From f822941150eec7a67a4d8bccf17080967ce76467 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 10 Mar 2026 02:35:42 +0000 Subject: [PATCH 2/6] feat: add stream subcommand for live terminal session streaming Co-authored-by: ibigbug <543405+ibigbug@users.noreply.github.com> --- Cargo.lock | 128 ++++++++++++++++++++- Cargo.toml | 2 + src/commands/api/asciinema.rs | 61 +++++++++- src/commands/api/mod.rs | 13 +++ src/commands/mod.rs | 4 +- src/commands/stream.rs | 206 ++++++++++++++++++++++++++++++++++ src/main.rs | 64 ++++++++++- 7 files changed, 471 insertions(+), 7 deletions(-) create mode 100644 src/commands/stream.rs diff --git a/Cargo.lock b/Cargo.lock index 9a38ca0..9a8aa2a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -16,6 +16,7 @@ dependencies = [ "rustc_version_runtime", "serde", "serde_json", + "tungstenite", "uuid", "windows", ] @@ -161,6 +162,15 @@ version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5c8214115b7bf84099f1309324e63141d4c5d7cc26862f97a0a857dbefe165bd" +[[package]] +name = "block-buffer" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71" +dependencies = [ + "generic-array", +] + [[package]] name = "block2" version = "0.6.2" @@ -300,6 +310,41 @@ version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" +[[package]] +name = "cpufeatures" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59ed5838eebb26a2bb2e58f6d5b5316989ae9d08bab10e0e6d103e656d1b0280" +dependencies = [ + "libc", +] + +[[package]] +name = "crypto-common" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78c8292055d1c1df0cce5d180393dc8cce0abec0a7102adb6c7b1eef6016d60a" +dependencies = [ + "generic-array", + "typenum", +] + +[[package]] +name = "data-encoding" +version = "2.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7a1e2f27636f116493b8b860f5546edb47c8d8f8ea73e1d2a20be88e28d1fea" + +[[package]] +name = "digest" +version = "0.10.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" +dependencies = [ + "block-buffer", + "crypto-common", +] + [[package]] name = "dirs-next" version = "1.0.2" @@ -456,6 +501,16 @@ dependencies = [ "slab", ] +[[package]] +name = "generic-array" +version = "0.14.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a" +dependencies = [ + "typenum", + "version_check", +] + [[package]] name = "getrandom" version = "0.2.16" @@ -1507,11 +1562,11 @@ dependencies = [ [[package]] name = "schannel" -version = "0.1.27" +version = "0.1.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1f29ebaa345f945cec9fbbc532eb307f0fdad8161f281b6369539c8d84876b3d" +checksum = "891d81b926048e76efe18581bf793546b4c0eaf8448d72be8de2bbee5fd166e1" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -1586,6 +1641,17 @@ dependencies = [ "zmij", ] +[[package]] +name = "sha1" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "shlex" version = "1.3.0" @@ -1860,6 +1926,32 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tungstenite" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8628dcc84e5a09eb3d8423d6cb682965dea9133204e8fb3efee74c2a0c259442" +dependencies = [ + "bytes", + "data-encoding", + "http", + "httparse", + "log", + "rand", + "rustls", + "rustls-pki-types", + "sha1", + "thiserror 2.0.18", + "utf-8", + "webpki-roots 0.26.11", +] + +[[package]] +name = "typenum" +version = "1.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "562d481066bde0658276a35467c4af00bdc6ee726305698a55b86e61d7ad82bb" + [[package]] name = "unicase" version = "2.8.1" @@ -1895,6 +1987,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + [[package]] name = "utf16_iter" version = "1.0.5" @@ -1925,6 +2023,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "version_check" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" + [[package]] name = "walkdir" version = "2.5.0" @@ -2111,6 +2215,24 @@ dependencies = [ "rustls-pki-types", ] +[[package]] +name = "webpki-roots" +version = "0.26.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "521bc38abb08001b01866da9f51eb7c5d647a19260e00054a8c7fd5f9e57f7a9" +dependencies = [ + "webpki-roots 1.0.6", +] + +[[package]] +name = "webpki-roots" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22cfaf3c063993ff62e73cb4311efde4db1efb31ab78a3e5c457939ad5cc0bed" +dependencies = [ + "rustls-pki-types", +] + [[package]] name = "winapi" version = "0.3.9" diff --git a/Cargo.toml b/Cargo.toml index e142428..2077e2b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,6 +30,8 @@ uuid = { version = "1.21.0", features = [ reqwest = { version = "0.13", features = ["blocking", "multipart"] } +tungstenite = { version = "0.28", features = ["rustls-tls-webpki-roots"] } + rustc_version_runtime = "0.3.0" os_info = "3" base64 = "0.22" diff --git a/src/commands/api/asciinema.rs b/src/commands/api/asciinema.rs index 32d3f98..317d84f 100644 --- a/src/commands/api/asciinema.rs +++ b/src/commands/api/asciinema.rs @@ -1,4 +1,4 @@ -use super::ApiService; +use super::{ApiService, StreamInfo}; use base64::Engine; use base64::prelude::BASE64_STANDARD; @@ -193,6 +193,65 @@ impl ApiService for Asciinema { None } } + + fn create_stream(&self, cols: u16, rows: u16) -> Option { + #[derive(Deserialize)] + struct CreateStreamResponse { + id: String, + url: String, + ws_producer_url: String, + } + + let stream_url = format!("{}/api/streams", &self.config.api_server); + let body = serde_json::json!({ "cols": cols, "rows": rows }); + let res = match self + .http_client + .post(stream_url) + .json(&body) + .send() + { + Ok(r) => r, + Err(e) => { + println!("Failed to reach stream server: {}", e); + return None; + } + }; + + if res.status().is_success() { + match res.json::() { + Ok(resp) => Some(StreamInfo { + id: resp.id, + url: resp.url, + ws_producer_url: resp.ws_producer_url, + }), + Err(e) => { + println!("Failed to parse stream response: {}", e); + None + } + } + } else { + println!("Failed to create stream:"); + println!("{}", res.text().unwrap_or_default()); + None + } + } + + fn get_stream_ws_url(&self, stream_id: &str) -> String { + // Derive the producer WebSocket URL from the server base URL and stream ID. + // The path follows the asciinema server convention: /ws/S/ + let base = self + .config + .api_server + .trim_end_matches('/') + .replace("https://", "wss://") + .replace("http://", "ws://"); + format!("{}/ws/S/{}", base, stream_id) + } + + fn get_auth_header(&self) -> String { + let cred = format!("user:{}", self.config.install_id); + format!("Basic {}", BASE64_STANDARD.encode(&cred)) + } } #[cfg(test)] diff --git a/src/commands/api/mod.rs b/src/commands/api/mod.rs index f6beeb0..cc82848 100644 --- a/src/commands/api/mod.rs +++ b/src/commands/api/mod.rs @@ -1,8 +1,21 @@ mod asciinema; +pub struct StreamInfo { + pub id: String, + pub url: String, + pub ws_producer_url: String, +} + pub trait ApiService { fn auth(&self); fn upload(&self, filepath: &str) -> Option; + /// Create a new live stream on the server. Returns `StreamInfo` with the public viewer + /// URL and the WebSocket producer URL to push events to. + fn create_stream(&self, cols: u16, rows: u16) -> Option; + /// Build the WebSocket producer URL for an *existing* stream identified by `stream_id`. + fn get_stream_ws_url(&self, stream_id: &str) -> String; + /// Return the `Authorization` header value to authenticate WebSocket connections. + fn get_auth_header(&self) -> String; } pub use asciinema::Asciinema; diff --git a/src/commands/mod.rs b/src/commands/mod.rs index e715c3e..b7320a8 100644 --- a/src/commands/mod.rs +++ b/src/commands/mod.rs @@ -4,12 +4,14 @@ mod api; mod auth; mod play; mod record; +mod stream; mod types; mod upload; pub use auth::Auth; pub use play::Play; pub use record::Record; +pub use stream::Stream; pub use upload::Upload; -pub use api::Asciinema; +pub use api::{ApiService, Asciinema}; diff --git a/src/commands/stream.rs b/src/commands/stream.rs new file mode 100644 index 0000000..6eab064 --- /dev/null +++ b/src/commands/stream.rs @@ -0,0 +1,206 @@ +use std::{ + collections::HashMap, + env, + sync::mpsc::channel, + thread, + time::SystemTime, +}; + +use log::{error, trace}; +use tungstenite::Message; + +#[cfg(windows)] +use windows::Win32::{ + Foundation::HANDLE, + Storage::FileSystem::ReadFile, + System::Console::{GetStdHandle, WriteConsoleW, STD_INPUT_HANDLE, STD_OUTPUT_HANDLE}, +}; + +use crate::commands::types::LineItem; +use crate::terminal::{Terminal, WindowsTerminal}; + +pub struct Stream { + ws_url: String, + stream_url: String, + auth_header: String, + #[allow(dead_code)] + env: HashMap, + command: String, + #[cfg(windows)] + terminal: WindowsTerminal, +} + +impl Stream { + pub fn new( + ws_url: String, + stream_url: String, + auth_header: String, + env: Option>, + command: Option, + ) -> Self { + Stream { + ws_url, + stream_url, + auth_header, + env: env.unwrap_or_default(), + command: command + .unwrap_or_else(|| env::var("SHELL").unwrap_or("powershell.exe".to_owned())), + terminal: WindowsTerminal::new(None), + } + } + + pub fn execute(&mut self) { + println!("Streaming. Watch at: {}", self.stream_url); + println!("Press Ctrl+D to stop."); + self.stream(); + } + + fn stream(&mut self) { + let now = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .expect("check your machine time"); + let record_start_time = now.as_secs() as f64 + now.subsec_nanos() as f64 * 1e-9; + + // Build the WebSocket connection request, including the Authorization header so + // both freshly-created streams and reconnects to existing streams authenticate. + let request = tungstenite::http::Request::builder() + .uri(&self.ws_url) + .header("Authorization", &self.auth_header) + .body(()) + .expect("failed to build WebSocket request"); + + let (mut ws, _) = + tungstenite::connect(request).expect("failed to connect to stream server"); + + // Send an asciicast-compatible reset event so the server knows the terminal size. + let reset_data = format!("{}x{}", self.terminal.width, self.terminal.height); + let reset_event = serde_json::to_string(&[ + LineItem::F64(0.0), + LineItem::String("r".to_string()), + LineItem::String(reset_data), + ]) + .unwrap(); + ws.send(Message::Text(reset_event.into())) + .expect("failed to send reset event"); + + let (stdin_tx, stdin_rx) = channel::<(Vec, usize)>(); + let (stdout_tx, stdout_rx) = channel::<(Vec, usize)>(); + + // On Windows, use ReadFile directly to preserve ESC sequences (same as record.rs). + #[cfg(windows)] + let stdin_handle: isize = unsafe { + GetStdHandle(STD_INPUT_HANDLE) + .expect("failed to get Windows stdin handle (STD_INPUT_HANDLE)") + .0 as isize + }; + + thread::spawn(move || loop { + let mut buf = [0u8; 10]; + + #[cfg(windows)] + let n = { + let mut n_read: u32 = 0; + let ok = unsafe { + ReadFile( + HANDLE(stdin_handle as _), + Some(&mut buf), + Some(&mut n_read), + None, + ) + .is_ok() + }; + if !ok { + panic!("ReadFile on stdin failed"); + } + if n_read == 0 { + panic!("pty stdin closed"); + } + n_read as usize + }; + + #[cfg(not(windows))] + let n = { + use std::io::Read; + match std::io::stdin().lock().read(&mut buf) { + Ok(n) if n > 0 => n, + _ => panic!("pty stdin closed"), + } + }; + + stdin_tx.send((buf.to_vec(), n)).unwrap(); + }); + + // Stdout thread: read pty output, display it locally, and forward it to the WebSocket. + thread::spawn(move || { + #[cfg(windows)] + let stdout_handle: HANDLE = unsafe { + GetStdHandle(STD_OUTPUT_HANDLE).expect("failed to get stdout handle") + }; + + let mut pending_bytes: Vec = Vec::new(); + + loop { + let rv = stdout_rx.recv(); + match rv { + Ok((buf, len)) => { + if len == 0 { + trace!("stdout received close indicator"); + println!("\nStreaming session ended."); + ws.close(None).ok(); + break; + } + + let now = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .expect("check your machine time"); + let ts = now.as_secs() as f64 + now.subsec_nanos() as f64 * 1e-9 + - record_start_time; + + pending_bytes.extend_from_slice(&buf[..len]); + + let valid_up_to = match std::str::from_utf8(&pending_bytes) { + Ok(_) => pending_bytes.len(), + Err(e) => e.valid_up_to(), + }; + + if valid_up_to > 0 { + let chars = + std::str::from_utf8(&pending_bytes[..valid_up_to]).unwrap(); + + let data = vec![ + LineItem::F64(ts), + LineItem::String("o".to_string()), + LineItem::String(chars.to_string()), + ]; + let event = serde_json::to_string(&data).unwrap(); + + if let Err(e) = ws.send(Message::Text(event.into())) { + error!("failed to send WebSocket message: {}", e); + break; + } + + // Echo output to the local console as well. + #[cfg(windows)] + unsafe { + let utf16: Vec = chars.encode_utf16().collect(); + WriteConsoleW(stdout_handle, &utf16, None, None) + .expect("failed to write stdout"); + } + } + + pending_bytes.drain(..valid_up_to); + } + + Err(err) => { + error!("reading stdout: {}", err.to_string()); + break; + } + } + } + }); + + self.terminal.attach_stdin(stdin_rx); + self.terminal.attach_stdout(stdout_tx); + self.terminal.run(&self.command).unwrap(); + } +} diff --git a/src/main.rs b/src/main.rs index bfa971f..e142017 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,8 +7,8 @@ mod terminal; use clap::builder::styling::{AnsiColor, Styles}; use clap::{Arg, Command, crate_version}; -use commands::{Asciinema, Auth, Play}; -use commands::{Record, Upload}; +use commands::{Asciinema, Auth, Play, Stream}; +use commands::{ApiService, Record, Upload}; use fern::colors::ColoredLevelConfig; use log::trace; @@ -130,6 +130,23 @@ fn main() { .required(true), ), ) + .subcommand( + Command::new("stream") + .about("Stream a live terminal session to the asciinema server") + .arg( + Arg::new("command") + .help("The command to stream, defaults to $SHELL") + .num_args(1) + .short('c') + .long("command"), + ) + .arg( + Arg::new("id") + .help("Reconnect to an existing stream by its ID") + .num_args(1) + .long("id"), + ), + ) .arg( Arg::new("log-level") .help("can be one of [error|warn|info|debug|trace]") @@ -199,6 +216,49 @@ fn main() { Err(_) => println!("Error: not a correct URL - e.g: https://asciinema.org"), } } + Some(("stream", stream_matches)) => { + let api_service = Asciinema::new(); + let command = stream_matches + .get_one::("command") + .map(Into::into); + let auth_header = api_service.get_auth_header(); + + let (ws_url, stream_url) = + if let Some(id) = stream_matches.get_one::("id") { + // Reconnect to an existing stream using its ID. + let ws = api_service.get_stream_ws_url(id); + // Derive the viewer URL: wss://host/ws/S/ -> https://host/s/ + let viewer = ws + .replace("wss://", "https://") + .replace("ws://", "http://") + .replace("/ws/S/", "/s/"); + (ws, viewer) + } else { + // Create a fresh stream on the server. + // Read the current terminal size to inform the server. + #[cfg(windows)] + let (cols, rows) = { + let t = crate::terminal::WindowsTerminal::new(None); + (t.width as u16, t.height as u16) + }; + #[cfg(not(windows))] + let (cols, rows) = (80u16, 24u16); + + match api_service.create_stream(cols, rows) { + Some(info) => (info.ws_producer_url, info.url), + None => { + eprintln!( + "Failed to create stream. \ + Is the server reachable and are you authenticated?" + ); + std::process::exit(1); + } + } + }; + + let mut stream = Stream::new(ws_url, stream_url, auth_header, None, command); + stream.execute(); + } _ => unreachable!(), } } From 30593d20663b73486f55ed140c93985afd21792a Mon Sep 17 00:00:00 2001 From: "openai-code-agent[bot]" <242516109+Codex@users.noreply.github.com> Date: Tue, 10 Mar 2026 04:36:30 +0000 Subject: [PATCH 3/6] address stream command review feedback --- Cargo.lock | 2 + Cargo.toml | 2 +- src/commands/api/asciinema.rs | 47 ++++++++++++++ src/commands/stream.rs | 102 ++++++++++++++++++++++++------ src/main.rs | 6 +- src/terminal/impl_win/terminal.rs | 4 ++ 6 files changed, 139 insertions(+), 24 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9a8aa2a..e161dfd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1413,6 +1413,8 @@ dependencies = [ "rustls", "rustls-pki-types", "rustls-platform-verifier", + "serde", + "serde_json", "sync_wrapper", "tokio", "tokio-rustls", diff --git a/Cargo.toml b/Cargo.toml index 2077e2b..925d058 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,7 +28,7 @@ uuid = { version = "1.21.0", features = [ "macro-diagnostics", # Enable better diagnostics for compile-time UUIDs ] } -reqwest = { version = "0.13", features = ["blocking", "multipart"] } +reqwest = { version = "0.13", features = ["blocking", "multipart", "json"] } tungstenite = { version = "0.28", features = ["rustls-tls-webpki-roots"] } diff --git a/src/commands/api/asciinema.rs b/src/commands/api/asciinema.rs index 317d84f..f73286f 100644 --- a/src/commands/api/asciinema.rs +++ b/src/commands/api/asciinema.rs @@ -256,6 +256,7 @@ impl ApiService for Asciinema { #[cfg(test)] mod tests { + use base64::prelude::BASE64_STANDARD; use crate::commands::api::asciinema::Config; use uuid::{Uuid, Version}; @@ -266,4 +267,50 @@ mod tests { let uuid = Uuid::parse_str(&c.install_id); assert_eq!(uuid.unwrap().get_version(), Some(Version::Random)); // uuid4 } + + #[test] + fn test_get_stream_ws_url_https_and_http() { + let client = reqwest::blocking::Client::new(); + + let asc_https = super::Asciinema { + config: Config { + install_id: "install".to_string(), + api_server: "https://demo.asciinema.org/".to_string(), + location: String::new(), + }, + http_client: client.clone(), + }; + assert_eq!( + asc_https.get_stream_ws_url("abc123"), + "wss://demo.asciinema.org/ws/S/abc123" + ); + + let asc_http = super::Asciinema { + config: Config { + install_id: "install".to_string(), + api_server: "http://asciinema.test".to_string(), + location: String::new(), + }, + http_client: client, + }; + assert_eq!( + asc_http.get_stream_ws_url("xyz"), + "ws://asciinema.test/ws/S/xyz" + ); + } + + #[test] + fn test_get_auth_header_format() { + let asc = super::Asciinema { + config: Config { + install_id: "token-123".to_string(), + api_server: "https://example".to_string(), + location: String::new(), + }, + http_client: reqwest::blocking::Client::new(), + }; + + let expected = format!("Basic {}", BASE64_STANDARD.encode("user:token-123")); + assert_eq!(asc.get_auth_header(), expected); + } } diff --git a/src/commands/stream.rs b/src/commands/stream.rs index 6eab064..1d7c356 100644 --- a/src/commands/stream.rs +++ b/src/commands/stream.rs @@ -1,12 +1,13 @@ use std::{ - collections::HashMap, env, - sync::mpsc::channel, + io::ErrorKind, + sync::{mpsc::channel, Arc, Mutex}, thread, - time::SystemTime, + time::{Duration, SystemTime}, }; use log::{error, trace}; +use tungstenite::stream::MaybeTlsStream; use tungstenite::Message; #[cfg(windows)] @@ -23,8 +24,6 @@ pub struct Stream { ws_url: String, stream_url: String, auth_header: String, - #[allow(dead_code)] - env: HashMap, command: String, #[cfg(windows)] terminal: WindowsTerminal, @@ -35,14 +34,12 @@ impl Stream { ws_url: String, stream_url: String, auth_header: String, - env: Option>, command: Option, ) -> Self { Stream { ws_url, stream_url, auth_header, - env: env.unwrap_or_default(), command: command .unwrap_or_else(|| env::var("SHELL").unwrap_or("powershell.exe".to_owned())), terminal: WindowsTerminal::new(None), @@ -51,7 +48,7 @@ impl Stream { pub fn execute(&mut self) { println!("Streaming. Watch at: {}", self.stream_url); - println!("Press Ctrl+D to stop."); + println!("Exit the shell/command to stop (Ctrl+Z then Enter on Windows)."); self.stream(); } @@ -71,17 +68,74 @@ impl Stream { let (mut ws, _) = tungstenite::connect(request).expect("failed to connect to stream server"); + match ws.get_mut() { + MaybeTlsStream::Plain(stream) => { + stream + .set_nonblocking(true) + .unwrap_or_else(|e| trace!("failed to set non-blocking websocket: {}", e)); + } + MaybeTlsStream::Rustls(stream) => { + let (_, tcp) = stream.get_mut(); + tcp.set_nonblocking(true) + .unwrap_or_else(|e| trace!("failed to set non-blocking websocket: {}", e)); + } + #[cfg(feature = "native-tls")] + MaybeTlsStream::NativeTls(stream) => { + stream + .get_mut() + .set_nonblocking(true) + .unwrap_or_else(|e| trace!("failed to set non-blocking websocket: {}", e)); + } + } + let ws = Arc::new(Mutex::new(ws)); + + { + // Send an asciicast-compatible reset event so the server knows the terminal size. + let reset_data = format!("{}x{}", self.terminal.width, self.terminal.height); + let reset_event = serde_json::to_string(&[ + LineItem::F64(0.0), + LineItem::String("r".to_string()), + LineItem::String(reset_data), + ]) + .unwrap(); + let mut sock = ws.lock().expect("websocket mutex poisoned"); + sock.send(Message::Text(reset_event.into())) + .expect("failed to send reset event"); + } - // Send an asciicast-compatible reset event so the server knows the terminal size. - let reset_data = format!("{}x{}", self.terminal.width, self.terminal.height); - let reset_event = serde_json::to_string(&[ - LineItem::F64(0.0), - LineItem::String("r".to_string()), - LineItem::String(reset_data), - ]) - .unwrap(); - ws.send(Message::Text(reset_event.into())) - .expect("failed to send reset event"); + // Keep a read loop alive to respond to Ping/Pong/Close frames from the server. + let ws_reader = ws.clone(); + thread::spawn(move || loop { + let msg = { + let mut sock = ws_reader.lock().expect("websocket mutex poisoned"); + sock.read_message() + }; + + match msg { + Ok(Message::Ping(payload)) => { + if let Ok(mut sock) = ws_reader.lock() { + let _ = sock.send(Message::Pong(payload)); + } + } + Ok(Message::Close(frame)) => { + trace!("server closed stream: {:?}", frame); + break; + } + Ok(_) => {} + Err(tungstenite::Error::Io(ref e)) + if e.kind() == ErrorKind::WouldBlock || e.kind() == ErrorKind::TimedOut => + { + thread::sleep(Duration::from_millis(25)); + } + Err(tungstenite::Error::AlreadyClosed | tungstenite::Error::ConnectionClosed) => { + break; + } + Err(e) => { + error!("websocket read error: {}", e); + break; + } + } + }); let (stdin_tx, stdin_rx) = channel::<(Vec, usize)>(); let (stdout_tx, stdout_rx) = channel::<(Vec, usize)>(); @@ -131,6 +185,7 @@ impl Stream { }); // Stdout thread: read pty output, display it locally, and forward it to the WebSocket. + let ws_writer = ws.clone(); thread::spawn(move || { #[cfg(windows)] let stdout_handle: HANDLE = unsafe { @@ -146,7 +201,9 @@ impl Stream { if len == 0 { trace!("stdout received close indicator"); println!("\nStreaming session ended."); - ws.close(None).ok(); + if let Ok(mut sock) = ws_writer.lock() { + let _ = sock.close(None); + } break; } @@ -174,7 +231,12 @@ impl Stream { ]; let event = serde_json::to_string(&data).unwrap(); - if let Err(e) = ws.send(Message::Text(event.into())) { + let send_result = ws_writer + .lock() + .expect("websocket mutex poisoned") + .send(Message::Text(event.into())); + + if let Err(e) = send_result { error!("failed to send WebSocket message: {}", e); break; } diff --git a/src/main.rs b/src/main.rs index e142017..2a59e69 100644 --- a/src/main.rs +++ b/src/main.rs @@ -238,8 +238,8 @@ fn main() { // Read the current terminal size to inform the server. #[cfg(windows)] let (cols, rows) = { - let t = crate::terminal::WindowsTerminal::new(None); - (t.width as u16, t.height as u16) + crate::terminal::WindowsTerminal::console_size() + .unwrap_or((80u16, 24u16)) }; #[cfg(not(windows))] let (cols, rows) = (80u16, 24u16); @@ -256,7 +256,7 @@ fn main() { } }; - let mut stream = Stream::new(ws_url, stream_url, auth_header, None, command); + let mut stream = Stream::new(ws_url, stream_url, auth_header, command); stream.execute(); } _ => unreachable!(), diff --git a/src/terminal/impl_win/terminal.rs b/src/terminal/impl_win/terminal.rs index 8040c1c..194ccc4 100644 --- a/src/terminal/impl_win/terminal.rs +++ b/src/terminal/impl_win/terminal.rs @@ -61,6 +61,10 @@ impl WindowsTerminal { } } + pub fn console_size() -> Option<(u16, u16)> { + unsafe { WindowsTerminal::get_console_size().ok().map(|(x, y)| (x as u16, y as u16)) } + } + fn create_pseudo_console_and_pipes( handle: &mut HPCON, stdin: &mut HANDLE, // the stdin to write input to PTY From 06a4fbd85db6fc352cf9f5226406c7c56cd89e08 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 10 Mar 2026 04:44:04 +0000 Subject: [PATCH 4/6] fix: fix CI compilation errors in stream.rs Co-authored-by: ibigbug <543405+ibigbug@users.noreply.github.com> --- src/commands/stream.rs | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/src/commands/stream.rs b/src/commands/stream.rs index 1d7c356..5320d74 100644 --- a/src/commands/stream.rs +++ b/src/commands/stream.rs @@ -75,12 +75,6 @@ impl Stream { .unwrap_or_else(|e| trace!("failed to set non-blocking websocket: {}", e)); } MaybeTlsStream::Rustls(stream) => { - let (_, tcp) = stream.get_mut(); - tcp.set_nonblocking(true) - .unwrap_or_else(|e| trace!("failed to set non-blocking websocket: {}", e)); - } - #[cfg(feature = "native-tls")] - MaybeTlsStream::NativeTls(stream) => { stream .get_mut() .set_nonblocking(true) @@ -108,7 +102,7 @@ impl Stream { thread::spawn(move || loop { let msg = { let mut sock = ws_reader.lock().expect("websocket mutex poisoned"); - sock.read_message() + sock.read() }; match msg { From 5c606619f05b687cde4adc844713549df22243b5 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 10 Mar 2026 05:47:36 +0000 Subject: [PATCH 5/6] fix: add wildcard arm to non-exhaustive MaybeTlsStream match Co-authored-by: ibigbug <543405+ibigbug@users.noreply.github.com> --- src/commands/stream.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/commands/stream.rs b/src/commands/stream.rs index 5320d74..5428372 100644 --- a/src/commands/stream.rs +++ b/src/commands/stream.rs @@ -80,6 +80,7 @@ impl Stream { .set_nonblocking(true) .unwrap_or_else(|e| trace!("failed to set non-blocking websocket: {}", e)); } + _ => {} } let ws = Arc::new(Mutex::new(ws)); From 8de0984fc939174d4fd1ed5b4e58ed1154bc531e Mon Sep 17 00:00:00 2001 From: "anthropic-code-agent[bot]" <242468646+Claude@users.noreply.github.com> Date: Tue, 10 Mar 2026 17:38:56 +0000 Subject: [PATCH 6/6] fix: address PR review comments and fix CI test compilation - Add missing trait imports (base64::Engine, ApiService) in asciinema.rs tests - Update exit message to be accurate (removed Windows-specific Ctrl+Z instruction) - PR review comments already addressed: console_size helper, WS read loop, env field removed Co-authored-by: ibigbug <543405+ibigbug@users.noreply.github.com> --- Cargo.lock | 41 ++++++++++++++++++++++++++++++++--- src/commands/api/asciinema.rs | 2 ++ src/commands/stream.rs | 2 +- 3 files changed, 41 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b0c85a6..44f3808 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -229,7 +229,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6f8d983286843e49675a4b7a2d174efe136dc93a18d69130dd18198a6c167601" dependencies = [ "cfg-if", - "cpufeatures", + "cpufeatures 0.3.0", "rand_core 0.10.0", ] @@ -321,6 +321,15 @@ version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" +[[package]] +name = "cpufeatures" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59ed5838eebb26a2bb2e58f6d5b5316989ae9d08bab10e0e6d103e656d1b0280" +dependencies = [ + "libc", +] + [[package]] name = "cpufeatures" version = "0.3.0" @@ -330,6 +339,32 @@ dependencies = [ "libc", ] +[[package]] +name = "crypto-common" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78c8292055d1c1df0cce5d180393dc8cce0abec0a7102adb6c7b1eef6016d60a" +dependencies = [ + "generic-array", + "typenum", +] + +[[package]] +name = "data-encoding" +version = "2.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7a1e2f27636f116493b8b860f5546edb47c8d8f8ea73e1d2a20be88e28d1fea" + +[[package]] +name = "digest" +version = "0.10.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" +dependencies = [ + "block-buffer", + "crypto-common", +] + [[package]] name = "dirs-next" version = "1.0.2" @@ -1653,7 +1688,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" dependencies = [ "cfg-if", - "cpufeatures", + "cpufeatures 0.2.17", "digest", ] @@ -1942,7 +1977,7 @@ dependencies = [ "http", "httparse", "log", - "rand", + "rand 0.9.1", "rustls", "rustls-pki-types", "sha1", diff --git a/src/commands/api/asciinema.rs b/src/commands/api/asciinema.rs index f73286f..995e0e2 100644 --- a/src/commands/api/asciinema.rs +++ b/src/commands/api/asciinema.rs @@ -257,7 +257,9 @@ impl ApiService for Asciinema { #[cfg(test)] mod tests { use base64::prelude::BASE64_STANDARD; + use base64::Engine; use crate::commands::api::asciinema::Config; + use crate::commands::api::ApiService; use uuid::{Uuid, Version}; diff --git a/src/commands/stream.rs b/src/commands/stream.rs index 5428372..3d4c620 100644 --- a/src/commands/stream.rs +++ b/src/commands/stream.rs @@ -48,7 +48,7 @@ impl Stream { pub fn execute(&mut self) { println!("Streaming. Watch at: {}", self.stream_url); - println!("Exit the shell/command to stop (Ctrl+Z then Enter on Windows)."); + println!("Exit the shell/command to stop streaming."); self.stream(); }