diff --git a/src/commands/play.rs b/src/commands/play.rs index 4a6031b..997aab1 100644 --- a/src/commands/play.rs +++ b/src/commands/play.rs @@ -7,8 +7,40 @@ use std::io::{BufRead, Write}; use std::path::Path; use std::process::exit; -use std::sync::{Condvar, Mutex}; -use std::time::Duration; +use std::sync::{Arc, Condvar, Mutex}; +use std::thread; +use std::time::{Duration, Instant}; + +#[cfg(windows)] +use log::warn; + +#[cfg(windows)] +use windows::Win32::{ + Foundation::HANDLE, + Storage::FileSystem::ReadFile, + System::Console::{ + GetConsoleMode, GetStdHandle, SetConsoleMode, CONSOLE_MODE, ENABLE_ECHO_INPUT, + ENABLE_LINE_INPUT, STD_INPUT_HANDLE, + }, +}; + +#[cfg(windows)] +struct ConsoleGuard { + handle: HANDLE, + original_mode: CONSOLE_MODE, +} + +#[cfg(windows)] +impl Drop for ConsoleGuard { + fn drop(&mut self) { + unsafe { + // Best-effort restoration; log any failure but don't panic + if let Err(e) = SetConsoleMode(self.handle, self.original_mode) { + eprintln!("Warning: failed to restore console mode: {:?}", e); + } + } + } +} enum SessionLineSource { Lines(io::Lines>), @@ -29,7 +61,8 @@ impl Iterator for StdoutIter { fn next(&mut self) -> Option { match &mut self.0.line_iter { SessionLineSource::Vec(iter) => iter.next(), - SessionLineSource::Lines(iter) => iter.next().map(|line| { + SessionLineSource::Lines(iter) => loop { + let line = iter.next()?; let content = match line { Ok(l) => l, Err(e) => { @@ -37,6 +70,10 @@ impl Iterator for StdoutIter { exit(1); } }; + // Skip empty or whitespace-only lines (e.g. trailing newlines in files) + if content.trim().is_empty() { + continue; + } let line_data: Vec = match serde_json::from_str(&content) { Ok(data) => data, Err(e) => { @@ -49,7 +86,7 @@ impl Iterator for StdoutIter { exit(1); } - SessionLine { + let session_line = SessionLine { timestamp: match &line_data[0] { LineItem::F64(ts) => ts.clone(), _ => { @@ -71,8 +108,9 @@ impl Iterator for StdoutIter { exit(1); } }, - } - }), + }; + return Some(session_line); + }, } } } @@ -161,10 +199,28 @@ fn parse_reader(reader: Box, source_name: &str) -> Session { }; if let Ok(header) = serde_json::from_str::(&first_line) { - // v2 format: header on first line, events stream on subsequent lines - Session { - header, - line_iter: SessionLineSource::Lines(line_iter), + // v2 format: header on first line, events stream on subsequent lines. + // Validate version == 2 to avoid misclassifying v1 recordings that + // happen to contain a timestamp field parseable as RecordHeader. + if header.version == 2 { + Session { + header, + line_iter: SessionLineSource::Lines(line_iter), + } + } else { + // Not v2 — fall through and try v1 parsing with the full content. + let mut file_content = first_line; + for line in line_iter { + file_content.push('\n'); + match line { + Ok(l) => file_content.push_str(&l), + Err(e) => { + eprintln!("error reading '{}': {}", source_name, e); + exit(1); + } + } + } + parse_v1(source_name, file_content) } } else { // Try v1 format: entire content is a single JSON object. @@ -180,50 +236,54 @@ fn parse_reader(reader: Box, source_name: &str) -> Session { } } } - match serde_json::from_str::(&file_content) { - Ok(recording) if recording.version == 1 => { - let header = RecordHeader { - version: recording.version, - width: recording.width, - height: recording.height, - timestamp: 0, - environment: HashMap::new(), - }; + parse_v1(source_name, file_content) + } +} - let mut absolute_time: f64 = 0.0; - let events: Vec = recording - .stdout - .into_iter() - .map(|(delay, text)| { - absolute_time += delay; - SessionLine { - timestamp: absolute_time, - stdout: true, - content: text, - } - }) - .collect(); +fn parse_v1(source_name: &str, file_content: String) -> Session { + match serde_json::from_str::(&file_content) { + Ok(recording) if recording.version == 1 => { + let header = RecordHeader { + version: recording.version, + width: recording.width, + height: recording.height, + timestamp: 0, + environment: HashMap::new(), + }; - Session { - header, - line_iter: SessionLineSource::Vec(events.into_iter()), - } - } - Ok(recording) => { - eprintln!( - "'{}': unsupported file format version {}", - source_name, recording.version - ); - exit(1); - } - Err(e) => { - eprintln!( - "'{}': unsupported or corrupt session file: {}", - source_name, e - ); - exit(1); + let mut absolute_time: f64 = 0.0; + let events: Vec = recording + .stdout + .into_iter() + .map(|(delay, text)| { + absolute_time += delay; + SessionLine { + timestamp: absolute_time, + stdout: true, + content: text, + } + }) + .collect(); + + Session { + header, + line_iter: SessionLineSource::Vec(events.into_iter()), } } + Ok(recording) => { + eprintln!( + "'{}': unsupported file format version {}", + source_name, recording.version + ); + exit(1); + } + Err(e) => { + eprintln!( + "'{}': unsupported or corrupt session file: {}", + source_name, e + ); + exit(1); + } } } @@ -272,6 +332,36 @@ impl Session { } } +/// Waits for `delay` seconds, respecting pause/resume signals from `pair`. +/// Returns once the full delay has elapsed (accounting for time spent paused). +fn wait_interruptible(pair: &Arc<(Mutex, Condvar)>, delay: f64) { + let mut remaining = delay; + while remaining > 1e-9 { + let (lock, cvar) = &**pair; + let mut paused_guard = lock.lock().unwrap(); + + // Block while paused. + while *paused_guard { + paused_guard = cvar.wait(paused_guard).unwrap(); + } + + // Wait for the remaining delay; an early wakeup means pause was toggled. + let start = Instant::now(); + let (_guard, timed_out) = cvar + .wait_timeout(paused_guard, Duration::from_secs_f64(remaining)) + .unwrap(); + + let elapsed = start.elapsed().as_secs_f64(); + remaining = (remaining - elapsed).max(0.0); + + if timed_out.timed_out() { + break; + } + // Woken early because pause was toggled — loop back to re-check + // the paused state and continue with the remaining delay. + } +} + pub struct Play { session: Session, idle_time_limit: Option, @@ -288,29 +378,129 @@ impl Play { } pub fn execute(self) { - let cond = Condvar::new(); - let g = Mutex::new(false); - #[allow(unused_must_use)] - for stdout in self.session.stdout_relative_time_iter() { - let mut delay = stdout.timestamp; + // Shared pause state: (paused flag, condvar to signal changes) + let pair = Arc::new((Mutex::new(false), Condvar::new())); + let pair_clone = Arc::clone(&pair); + + // On Windows: save stdin console mode and switch to raw (no line + // buffering / no echo) so space arrives immediately without Enter. + // We use an RAII guard to ensure console mode is restored on all exit paths. + #[cfg(windows)] + let _console_guard: Option = unsafe { + match GetStdHandle(STD_INPUT_HANDLE) { + Ok(h) if !h.is_invalid() => { + let mut orig = CONSOLE_MODE::default(); + if GetConsoleMode(h, &mut orig).is_err() { + warn!( + "pause: stdin is not a console (redirected?); \ + space-to-pause is disabled" + ); + None + } else { + let mut raw = orig; + raw &= !ENABLE_LINE_INPUT; + raw &= !ENABLE_ECHO_INPUT; + if let Err(e) = SetConsoleMode(h, raw) { + warn!("pause: failed to set console mode: {:?}; space-to-pause is disabled", e); + None + } else { + Some(ConsoleGuard { + handle: h, + original_mode: orig, + }) + } + } + } + _ => { + warn!( + "pause: could not obtain a valid stdin handle; \ + space-to-pause is disabled" + ); + None + } + } + }; + + // Spawn a thread that reads stdin and toggles pause when space is pressed. + // Only spawn the thread when pause support is actually enabled. + #[cfg(windows)] + if _console_guard.is_some() { + thread::spawn(move || { + unsafe { + let stdin_handle = match GetStdHandle(STD_INPUT_HANDLE) { + Ok(h) if !h.is_invalid() => h, + _ => return, + }; + loop { + let mut buf = [0u8; 1]; + let mut n_read: u32 = 0; + if ReadFile(stdin_handle, Some(&mut buf), Some(&mut n_read), None).is_err() + || n_read == 0 + { + break; + } + if buf[0] == b' ' { + let (lock, cvar) = &*pair_clone; + let mut paused = lock.lock().unwrap(); + *paused = !*paused; + cvar.notify_all(); + } + } + } + }); + } + + #[cfg(not(windows))] + { + use std::io::IsTerminal; + // Only enable pause when stdin is an interactive terminal; don't + // consume piped input in tests or scripted environments. + if std::io::stdin().is_terminal() { + thread::spawn(move || { + use std::io::Read; + loop { + let mut buf = [0u8; 1]; + match std::io::stdin().lock().read(&mut buf) { + Ok(1) if buf[0] == b' ' => { + let (lock, cvar) = &*pair_clone; + let mut paused = lock.lock().unwrap(); + *paused = !*paused; + cvar.notify_all(); + } + Ok(n) if n > 0 => {} + _ => break, + } + } + }); + } + } + + for stdout_item in self.session.stdout_relative_time_iter() { + let mut delay = stdout_item.timestamp; if let Some(limit) = self.idle_time_limit { delay = delay.min(limit); } delay /= self.speed; - cond.wait_timeout(g.lock().unwrap(), Duration::from_secs_f64(delay)) + + wait_interruptible(&pair, delay); + + io::stdout() + .write_all(stdout_item.content.as_bytes()) .unwrap(); - io::stdout().write_all(stdout.content.as_bytes()).unwrap(); io::stdout().flush().unwrap(); } + // Console mode is automatically restored by the ConsoleGuard's Drop impl } } #[cfg(test)] mod tests { + use super::{is_url, normalize_url, wait_interruptible}; use crate::Play; use std::path::PathBuf; - - use super::{is_url, normalize_url}; + use std::sync::{Arc, Condvar, Mutex}; + use std::thread; + use std::time::{Duration, Instant}; fn test_data_path() -> String { let mut d = PathBuf::from(env!("CARGO_MANIFEST_DIR")); @@ -366,6 +556,49 @@ mod tests { play.execute(); } + /// Verify that wait_interruptible completes without pause after the + /// requested delay. + #[test] + fn test_wait_interruptible_no_pause() { + let pair = Arc::new((Mutex::new(false), Condvar::new())); + let start = Instant::now(); + wait_interruptible(&pair, 0.05); + assert!(start.elapsed() >= Duration::from_millis(30)); + } + + /// Verify that wait_interruptible blocks while paused and resumes correctly + /// once the pause is lifted, accounting for the time spent paused. + #[test] + fn test_wait_interruptible_pause_and_resume() { + let pair = Arc::new((Mutex::new(false), Condvar::new())); + let pair_clone = Arc::clone(&pair); + + // Start paused immediately. + { + let (lock, cvar) = &*pair; + let mut paused = lock.lock().unwrap(); + *paused = true; + cvar.notify_all(); + } + + // Lift the pause after 50 ms. + thread::spawn(move || { + thread::sleep(Duration::from_millis(50)); + let (lock, cvar) = &*pair_clone; + let mut paused = lock.lock().unwrap(); + *paused = false; + cvar.notify_all(); + }); + + let start = Instant::now(); + // Tiny delay — execution is dominated by the 50 ms pause period. + wait_interruptible(&pair, 0.001); + let elapsed = start.elapsed(); + + // Must have waited at least ~50 ms for the resume signal. + assert!(elapsed >= Duration::from_millis(30)); + } + #[test] fn test_is_url() { assert!(is_url("https://asciinema.org/a/abc123"));