Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/chrometrace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ impl Chrometrace {
Ok(())
}

pub fn write(&self, w: &mut dyn Write) -> Result<(), Error> {
pub fn write<T: Write + ?Sized>(&self, w: &mut T) -> Result<(), Error> {
let mut events = Vec::new();
events.extend(self.events.to_vec());

Expand Down
12 changes: 12 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ pub struct Config {
#[doc(hidden)]
pub sampling_rate: u64,
#[doc(hidden)]
pub save_period: Option<u64>,
#[doc(hidden)]
pub filename: Option<String>,
#[doc(hidden)]
pub format: Option<FileFormat>,
Expand Down Expand Up @@ -125,6 +127,7 @@ impl Default for Config {
blocking: LockingStrategy::Lock,
show_line_numbers: false,
sampling_rate: 100,
save_period: None,
duration: RecordDuration::Unlimited,
native: false,
gil_only: false,
Expand Down Expand Up @@ -249,6 +252,14 @@ impl Config {
.takes_value(true),
)
.arg(rate.clone())
.arg(
Arg::new("save_period")
.long("save_period")
.value_name("save_period")
.value_parser(value_parser!(u64))
.help("The number of intervals between saving the profile to disk")
.takes_value(true),
)
.arg(subprocesses.clone())
.arg(Arg::new("function").short('F').long("function").help(
"Aggregate samples by function's first line number, instead of current line number",
Expand Down Expand Up @@ -372,6 +383,7 @@ impl Config {
match subcommand {
"record" => {
config.sampling_rate = matches.value_of_t("rate")?;
config.save_period = matches.get_one::<u64>("save_period").cloned();
config.duration = match matches.value_of("duration") {
Some("unlimited") | None => RecordDuration::Unlimited,
Some(seconds) => {
Expand Down
4 changes: 2 additions & 2 deletions src/flamegraph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl Flamegraph {
.collect()
}

pub fn write(&self, w: &mut dyn Write) -> Result<(), Error> {
pub fn write<T: Write + ?Sized>(&self, w: &mut T) -> Result<(), Error> {
let mut opts = Options::default();
opts.direction = Direction::Inverted;
opts.min_width = 0.1;
Expand All @@ -92,7 +92,7 @@ impl Flamegraph {
Ok(())
}

pub fn write_raw(&self, w: &mut dyn Write) -> Result<(), Error> {
pub fn write_raw<T: Write + ?Sized>(&self, w: &mut T) -> Result<(), Error> {
for line in self.get_lines() {
w.write_all(line.as_bytes())?;
w.write_all(b"\n")?;
Expand Down
107 changes: 71 additions & 36 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ mod timer;
mod utils;
mod version;

use std::io::{Read, Write};
use std::fs::File;
use std::io::{Read, Seek, SeekFrom, Write};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -88,34 +89,51 @@ fn sample_console(pid: remoteprocess::Pid, config: &Config) -> Result<(), Error>

pub trait Recorder {
fn increment(&mut self, trace: &StackTrace) -> Result<(), Error>;
fn write(&self, w: &mut dyn Write) -> Result<(), Error>;
fn write<W: Write>(&self, w: &mut W) -> Result<(), Error>;
fn overwrite(&self, w: &mut File) -> Result<(), Error> {
w.set_len(0)?;
w.seek(SeekFrom::Start(0))?;
self.write(w)?;
w.flush()?;
Ok(())
}
fn ext() -> &'static str;
}

impl Recorder for speedscope::Stats {
fn increment(&mut self, trace: &StackTrace) -> Result<(), Error> {
Ok(self.record(trace)?)
}
fn write(&self, w: &mut dyn Write) -> Result<(), Error> {
fn write<W: Write>(&self, w: &mut W) -> Result<(), Error> {
self.write(w)
}
fn ext() -> &'static str {
"json"
}
}

impl Recorder for flamegraph::Flamegraph {
fn increment(&mut self, trace: &StackTrace) -> Result<(), Error> {
Ok(self.increment(trace)?)
}
fn write(&self, w: &mut dyn Write) -> Result<(), Error> {
fn write<W: Write>(&self, w: &mut W) -> Result<(), Error> {
self.write(w)
}
fn ext() -> &'static str {
"svg"
}
}

impl Recorder for chrometrace::Chrometrace {
fn increment(&mut self, trace: &StackTrace) -> Result<(), Error> {
Ok(self.increment(trace)?)
}
fn write(&self, w: &mut dyn Write) -> Result<(), Error> {
fn write<W: Write>(&self, w: &mut W) -> Result<(), Error> {
self.write(w)
}
fn ext() -> &'static str {
"json"
}
}

pub struct RawFlamegraph(flamegraph::Flamegraph);
Expand All @@ -125,36 +143,24 @@ impl Recorder for RawFlamegraph {
Ok(self.0.increment(trace)?)
}

fn write(&self, w: &mut dyn Write) -> Result<(), Error> {
fn write<W: Write>(&self, w: &mut W) -> Result<(), Error> {
self.0.write_raw(w)
}
}

fn record_samples(pid: remoteprocess::Pid, config: &Config) -> Result<(), Error> {
let mut output: Box<dyn Recorder> = match config.format {
Some(FileFormat::flamegraph) => {
Box::new(flamegraph::Flamegraph::new(config.show_line_numbers))
}
Some(FileFormat::speedscope) => Box::new(speedscope::Stats::new(config)),
Some(FileFormat::raw) => Box::new(RawFlamegraph(flamegraph::Flamegraph::new(
config.show_line_numbers,
))),
Some(FileFormat::chrometrace) => {
Box::new(chrometrace::Chrometrace::new(config.show_line_numbers))
}
None => return Err(format_err!("A file format is required to record samples")),
};
fn ext() -> &'static str {
"txt"
}
}

fn record_samples<R: Recorder>(
pid: remoteprocess::Pid,
config: &Config,
mut output: R,
) -> Result<(), Error> {
let filename = match config.filename.clone() {
Some(filename) => filename,
None => {
let ext = match config.format.as_ref() {
Some(FileFormat::flamegraph) => "svg",
Some(FileFormat::speedscope) => "json",
Some(FileFormat::raw) => "txt",
Some(FileFormat::chrometrace) => "json",
None => return Err(format_err!("A file format is required to record samples")),
};
let ext = R::ext();
let local_time = Local::now().to_rfc3339_opts(SecondsFormat::Secs, true);
let name = match config.python_program.as_ref() {
Some(prog) => prog[0].to_string(),
Expand Down Expand Up @@ -229,7 +235,9 @@ fn record_samples(pid: remoteprocess::Pid, config: &Config) -> Result<(), Error>
let mut exit_message = "Stopped sampling because process exited";
let mut last_late_message = std::time::Instant::now();

for mut sample in sampler {
let mut out_file = std::fs::File::create(&filename)?;

for (i_sample, mut sample) in sampler.into_iter().enumerate() {
if let Some(delay) = sample.late {
if delay > Duration::from_secs(1) {
if config.hide_progress {
Expand Down Expand Up @@ -318,6 +326,12 @@ fn record_samples(pid: remoteprocess::Pid, config: &Config) -> Result<(), Error>
};
progress.set_message(msg);
}

if let Some(save_period) = config.save_period {
if (i_sample + 1) % save_period as usize == 0 {
output.overwrite(&mut out_file)?;
}
}
progress.inc(1);
}
progress.finish();
Expand All @@ -326,10 +340,7 @@ fn record_samples(pid: remoteprocess::Pid, config: &Config) -> Result<(), Error>
println!("\n{}{}", lede, exit_message);
}

{
let mut out_file = std::fs::File::create(&filename)?;
output.write(&mut out_file)?;
}
output.overwrite(&mut out_file)?;

match config.format.as_ref().unwrap() {
FileFormat::flamegraph => {
Expand Down Expand Up @@ -374,9 +385,33 @@ fn run_spy_command(pid: remoteprocess::Pid, config: &config::Config) -> Result<(
"dump" => {
dump::print_traces(pid, config, None)?;
}
"record" => {
record_samples(pid, config)?;
}
"record" => match config.format {
Some(FileFormat::flamegraph) => {
record_samples(
pid,
config,
flamegraph::Flamegraph::new(config.show_line_numbers),
)?;
}
Some(FileFormat::speedscope) => {
record_samples(pid, config, speedscope::Stats::new(config))?;
}
Some(FileFormat::raw) => {
record_samples(
pid,
config,
RawFlamegraph(flamegraph::Flamegraph::new(config.show_line_numbers)),
)?;
}
Some(FileFormat::chrometrace) => {
record_samples(
pid,
config,
chrometrace::Chrometrace::new(config.show_line_numbers),
)?;
}
None => return Err(format_err!("A file format is required to record samples")),
},
"top" => {
sample_console(pid, config)?;
}
Expand Down
2 changes: 1 addition & 1 deletion src/speedscope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ impl Stats {
Ok(())
}

pub fn write(&self, w: &mut dyn Write) -> Result<(), Error> {
pub fn write<T: Write + ?Sized>(&self, w: &mut T) -> Result<(), Error> {
let json = serde_json::to_string(&SpeedscopeFile::new(
&self.samples,
&self.frames,
Expand Down
50 changes: 49 additions & 1 deletion tests/integration_test.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
from __future__ import print_function

import concurrent.futures
import json
import os
import re
import subprocess
import sys
import re
import tempfile
import time
import unittest
from collections import defaultdict, namedtuple
from pathlib import Path
from shutil import which

Frame = namedtuple("Frame", ["file", "name", "line", "col"])
Expand Down Expand Up @@ -119,6 +122,51 @@ def test_shell_completions(self):
cmdline = [PYSPY, "completions", "bash"]
subprocess.check_output(cmdline)

def test_watch(self):
with tempfile.NamedTemporaryFile() as profile_file:
filename = profile_file.name
cmdline = [
PYSPY,
"record",
"--save_period",
"1",
"--duration",
"1",
"--format",
"chrometrace",
"-o",
filename,
"--",
"python",
_get_script("busyloop.py"),
]
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(self.__watch_change, filename, 1.01)
subprocess.check_output(cmdline)
records = [
len(record) for _, record in future.result()
]
# assert we have a few records
assert len(records) > 50
# assert that the number of samples is increasing
assert all(records[i] <= records[i + 1] for i in range(len(records) - 1))

@staticmethod
def __watch_change(filename: str, duration: float):
"""Poor man's fswatch"""
records = []

file = Path(filename)
start = time.time()

while time.time() <= start + duration:
if file.rename(filename).exists() and (content:=file.read_text()).endswith("]\n"):
last_modify = file.stat().st_mtime
if (not records) or (records[-1][0] != last_modify):
records.append((last_modify, json.loads(content)))

return records


def _get_script(name):
base_dir = os.path.dirname(__file__)
Expand Down