diff --git a/src/chrometrace.rs b/src/chrometrace.rs index 24463800..c51cca97 100644 --- a/src/chrometrace.rs +++ b/src/chrometrace.rs @@ -104,7 +104,7 @@ impl Chrometrace { Ok(()) } - pub fn write(&self, w: &mut dyn Write) -> Result<(), Error> { + pub fn write(&self, w: &mut T) -> Result<(), Error> { let mut events = Vec::new(); events.extend(self.events.to_vec()); diff --git a/src/config.rs b/src/config.rs index de742d28..4c8a70d2 100644 --- a/src/config.rs +++ b/src/config.rs @@ -28,6 +28,8 @@ pub struct Config { #[doc(hidden)] pub sampling_rate: u64, #[doc(hidden)] + pub save_period: Option, + #[doc(hidden)] pub filename: Option, #[doc(hidden)] pub format: Option, @@ -125,6 +127,7 @@ impl Default for Config { blocking: LockingStrategy::Lock, show_line_numbers: false, sampling_rate: 100, + save_period: None, duration: RecordDuration::Unlimited, native: false, gil_only: false, @@ -249,6 +252,14 @@ impl Config { .takes_value(true), ) .arg(rate.clone()) + .arg( + Arg::new("save_period") + .long("save_period") + .value_name("save_period") + .value_parser(value_parser!(u64)) + .help("The number of intervals between saving the profile to disk") + .takes_value(true), + ) .arg(subprocesses.clone()) .arg(Arg::new("function").short('F').long("function").help( "Aggregate samples by function's first line number, instead of current line number", @@ -372,6 +383,7 @@ impl Config { match subcommand { "record" => { config.sampling_rate = matches.value_of_t("rate")?; + config.save_period = matches.get_one::("save_period").cloned(); config.duration = match matches.value_of("duration") { Some("unlimited") | None => RecordDuration::Unlimited, Some(seconds) => { diff --git a/src/flamegraph.rs b/src/flamegraph.rs index 795f0503..99d67537 100644 --- a/src/flamegraph.rs +++ b/src/flamegraph.rs @@ -80,7 +80,7 @@ impl Flamegraph { .collect() } - pub fn write(&self, w: &mut dyn Write) -> Result<(), Error> { + pub fn write(&self, w: &mut T) -> Result<(), Error> { let mut opts = Options::default(); opts.direction = Direction::Inverted; opts.min_width = 0.1; @@ -92,7 +92,7 @@ impl Flamegraph { Ok(()) } - pub fn write_raw(&self, w: &mut dyn Write) -> Result<(), Error> { + pub fn write_raw(&self, w: &mut T) -> Result<(), Error> { for line in self.get_lines() { w.write_all(line.as_bytes())?; w.write_all(b"\n")?; diff --git a/src/main.rs b/src/main.rs index df0d602c..92fb07f5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -28,7 +28,8 @@ mod timer; mod utils; mod version; -use std::io::{Read, Write}; +use std::fs::File; +use std::io::{Read, Seek, SeekFrom, Write}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; @@ -88,34 +89,51 @@ fn sample_console(pid: remoteprocess::Pid, config: &Config) -> Result<(), Error> pub trait Recorder { fn increment(&mut self, trace: &StackTrace) -> Result<(), Error>; - fn write(&self, w: &mut dyn Write) -> Result<(), Error>; + fn write(&self, w: &mut W) -> Result<(), Error>; + fn overwrite(&self, w: &mut File) -> Result<(), Error> { + w.set_len(0)?; + w.seek(SeekFrom::Start(0))?; + self.write(w)?; + w.flush()?; + Ok(()) + } + fn ext() -> &'static str; } impl Recorder for speedscope::Stats { fn increment(&mut self, trace: &StackTrace) -> Result<(), Error> { Ok(self.record(trace)?) } - fn write(&self, w: &mut dyn Write) -> Result<(), Error> { + fn write(&self, w: &mut W) -> Result<(), Error> { self.write(w) } + fn ext() -> &'static str { + "json" + } } impl Recorder for flamegraph::Flamegraph { fn increment(&mut self, trace: &StackTrace) -> Result<(), Error> { Ok(self.increment(trace)?) } - fn write(&self, w: &mut dyn Write) -> Result<(), Error> { + fn write(&self, w: &mut W) -> Result<(), Error> { self.write(w) } + fn ext() -> &'static str { + "svg" + } } impl Recorder for chrometrace::Chrometrace { fn increment(&mut self, trace: &StackTrace) -> Result<(), Error> { Ok(self.increment(trace)?) } - fn write(&self, w: &mut dyn Write) -> Result<(), Error> { + fn write(&self, w: &mut W) -> Result<(), Error> { self.write(w) } + fn ext() -> &'static str { + "json" + } } pub struct RawFlamegraph(flamegraph::Flamegraph); @@ -125,36 +143,24 @@ impl Recorder for RawFlamegraph { Ok(self.0.increment(trace)?) } - fn write(&self, w: &mut dyn Write) -> Result<(), Error> { + fn write(&self, w: &mut W) -> Result<(), Error> { self.0.write_raw(w) } -} -fn record_samples(pid: remoteprocess::Pid, config: &Config) -> Result<(), Error> { - let mut output: Box = match config.format { - Some(FileFormat::flamegraph) => { - Box::new(flamegraph::Flamegraph::new(config.show_line_numbers)) - } - Some(FileFormat::speedscope) => Box::new(speedscope::Stats::new(config)), - Some(FileFormat::raw) => Box::new(RawFlamegraph(flamegraph::Flamegraph::new( - config.show_line_numbers, - ))), - Some(FileFormat::chrometrace) => { - Box::new(chrometrace::Chrometrace::new(config.show_line_numbers)) - } - None => return Err(format_err!("A file format is required to record samples")), - }; + fn ext() -> &'static str { + "txt" + } +} +fn record_samples( + pid: remoteprocess::Pid, + config: &Config, + mut output: R, +) -> Result<(), Error> { let filename = match config.filename.clone() { Some(filename) => filename, None => { - let ext = match config.format.as_ref() { - Some(FileFormat::flamegraph) => "svg", - Some(FileFormat::speedscope) => "json", - Some(FileFormat::raw) => "txt", - Some(FileFormat::chrometrace) => "json", - None => return Err(format_err!("A file format is required to record samples")), - }; + let ext = R::ext(); let local_time = Local::now().to_rfc3339_opts(SecondsFormat::Secs, true); let name = match config.python_program.as_ref() { Some(prog) => prog[0].to_string(), @@ -229,7 +235,9 @@ fn record_samples(pid: remoteprocess::Pid, config: &Config) -> Result<(), Error> let mut exit_message = "Stopped sampling because process exited"; let mut last_late_message = std::time::Instant::now(); - for mut sample in sampler { + let mut out_file = std::fs::File::create(&filename)?; + + for (i_sample, mut sample) in sampler.into_iter().enumerate() { if let Some(delay) = sample.late { if delay > Duration::from_secs(1) { if config.hide_progress { @@ -318,6 +326,12 @@ fn record_samples(pid: remoteprocess::Pid, config: &Config) -> Result<(), Error> }; progress.set_message(msg); } + + if let Some(save_period) = config.save_period { + if (i_sample + 1) % save_period as usize == 0 { + output.overwrite(&mut out_file)?; + } + } progress.inc(1); } progress.finish(); @@ -326,10 +340,7 @@ fn record_samples(pid: remoteprocess::Pid, config: &Config) -> Result<(), Error> println!("\n{}{}", lede, exit_message); } - { - let mut out_file = std::fs::File::create(&filename)?; - output.write(&mut out_file)?; - } + output.overwrite(&mut out_file)?; match config.format.as_ref().unwrap() { FileFormat::flamegraph => { @@ -374,9 +385,33 @@ fn run_spy_command(pid: remoteprocess::Pid, config: &config::Config) -> Result<( "dump" => { dump::print_traces(pid, config, None)?; } - "record" => { - record_samples(pid, config)?; - } + "record" => match config.format { + Some(FileFormat::flamegraph) => { + record_samples( + pid, + config, + flamegraph::Flamegraph::new(config.show_line_numbers), + )?; + } + Some(FileFormat::speedscope) => { + record_samples(pid, config, speedscope::Stats::new(config))?; + } + Some(FileFormat::raw) => { + record_samples( + pid, + config, + RawFlamegraph(flamegraph::Flamegraph::new(config.show_line_numbers)), + )?; + } + Some(FileFormat::chrometrace) => { + record_samples( + pid, + config, + chrometrace::Chrometrace::new(config.show_line_numbers), + )?; + } + None => return Err(format_err!("A file format is required to record samples")), + }, "top" => { sample_console(pid, config)?; } diff --git a/src/speedscope.rs b/src/speedscope.rs index 3cc0725c..a5871635 100644 --- a/src/speedscope.rs +++ b/src/speedscope.rs @@ -253,7 +253,7 @@ impl Stats { Ok(()) } - pub fn write(&self, w: &mut dyn Write) -> Result<(), Error> { + pub fn write(&self, w: &mut T) -> Result<(), Error> { let json = serde_json::to_string(&SpeedscopeFile::new( &self.samples, &self.frames, diff --git a/tests/integration_test.py b/tests/integration_test.py index 023e098c..4e47e09c 100644 --- a/tests/integration_test.py +++ b/tests/integration_test.py @@ -1,13 +1,16 @@ from __future__ import print_function +import concurrent.futures import json import os +import re import subprocess import sys -import re import tempfile +import time import unittest from collections import defaultdict, namedtuple +from pathlib import Path from shutil import which Frame = namedtuple("Frame", ["file", "name", "line", "col"]) @@ -119,6 +122,51 @@ def test_shell_completions(self): cmdline = [PYSPY, "completions", "bash"] subprocess.check_output(cmdline) + def test_watch(self): + with tempfile.NamedTemporaryFile() as profile_file: + filename = profile_file.name + cmdline = [ + PYSPY, + "record", + "--save_period", + "1", + "--duration", + "1", + "--format", + "chrometrace", + "-o", + filename, + "--", + "python", + _get_script("busyloop.py"), + ] + with concurrent.futures.ThreadPoolExecutor() as executor: + future = executor.submit(self.__watch_change, filename, 1.01) + subprocess.check_output(cmdline) + records = [ + len(record) for _, record in future.result() + ] + # assert we have a few records + assert len(records) > 50 + # assert that the number of samples is increasing + assert all(records[i] <= records[i + 1] for i in range(len(records) - 1)) + + @staticmethod + def __watch_change(filename: str, duration: float): + """Poor man's fswatch""" + records = [] + + file = Path(filename) + start = time.time() + + while time.time() <= start + duration: + if file.rename(filename).exists() and (content:=file.read_text()).endswith("]\n"): + last_modify = file.stat().st_mtime + if (not records) or (records[-1][0] != last_modify): + records.append((last_modify, json.loads(content))) + + return records + def _get_script(name): base_dir = os.path.dirname(__file__)