diff --git a/libdd-profiling/benches/add_samples.rs b/libdd-profiling/benches/add_samples.rs index 4edcd94dd1..21fdddb375 100644 --- a/libdd-profiling/benches/add_samples.rs +++ b/libdd-profiling/benches/add_samples.rs @@ -47,6 +47,27 @@ fn make_stack_api2(frames: &[Frame2]) -> (Vec, Vec) { (locations, values) } +fn make_timestamped_profile( + sample_types: &[api::SampleType], + frames: &[Frame], + labels: &[api::Label<'static>], +) -> profiling::internal::Profile { + let mut profile = profiling::internal::Profile::try_new(sample_types, None).unwrap(); + let (locations, values) = make_stack_api(frames); + + for i in 0..1000 { + let sample = api::Sample { + locations: locations.clone(), + values: &values, + labels: labels.to_vec(), + }; + let ts = std::num::NonZeroI64::new(i + 1); + black_box(profile.try_add_sample(sample, ts)).unwrap(); + } + + profile +} + #[derive(Clone, Copy)] struct Frame { file_name: &'static str, @@ -189,6 +210,23 @@ pub fn bench_add_sample_vs_add2(c: &mut Criterion) { black_box(profile.only_for_testing_num_aggregated_samples()) }) }); + + c.bench_function( + "profile_serialize_compressed_pprof_timestamped_x1000", + |b| { + b.iter_batched( + || { + make_timestamped_profile( + &sample_types, + frames.as_slice(), + labels_api.as_slice(), + ) + }, + |profile| black_box(profile.serialize_into_compressed_pprof(None, None)).unwrap(), + BatchSize::SmallInput, + ) + }, + ); } criterion_group!(benches, bench_add_sample_vs_add2); diff --git a/libdd-profiling/src/profiles/compressor.rs b/libdd-profiling/src/profiles/compressor.rs index 4433698702..e7b8a73556 100644 --- a/libdd-profiling/src/profiles/compressor.rs +++ b/libdd-profiling/src/profiles/compressor.rs @@ -1,7 +1,7 @@ // Copyright 2025-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -use std::io::{self, Read, Write}; +use std::io::{self, BufWriter, Read, Write}; /// This type wraps a [`Vec`] to provide a [`Write`] interface that has a max /// capacity that won't be exceeded. Additionally, it gracefully handles @@ -90,6 +90,10 @@ pub trait ProfileCodec { ) -> io::Result; fn finish(encoder: Self::Encoder) -> io::Result>; + + /// Returns the recommended input buffer size for the encoder. + /// Used to size the `BufWriter` that wraps the encoder. + fn recommended_input_buf_size() -> usize; } #[allow(unused)] @@ -109,6 +113,10 @@ impl ProfileCodec for NoopProfileCodec { fn finish(encoder: Self::Encoder) -> io::Result> { Ok(encoder.into()) } + + fn recommended_input_buf_size() -> usize { + 0 + } } #[allow(unused)] @@ -132,6 +140,10 @@ impl ProfileCodec for ZstdProfileCodec { Err((_enc, error)) => Err(error), } } + + fn recommended_input_buf_size() -> usize { + zstd::Encoder::::recommended_input_size() + } } #[cfg(not(miri))] @@ -202,7 +214,7 @@ pub type DefaultObservationCodec = NoopObservationCodec; /// Used to compress profile data. pub struct Compressor { - encoder: C::Encoder, + encoder: BufWriter, } impl Compressor { @@ -218,13 +230,17 @@ impl Compressor { compression_level: i32, ) -> io::Result> { Ok(Compressor { - encoder: C::new_encoder(size_hint, max_capacity, compression_level)?, + encoder: BufWriter::with_capacity( + C::recommended_input_buf_size(), + C::new_encoder(size_hint, max_capacity, compression_level)?, + ), }) } /// Finish the compression, and return the compressed data. pub fn finish(self) -> io::Result> { - C::finish(self.encoder) + let encoder = self.encoder.into_inner().map_err(|e| e.into_error())?; + C::finish(encoder) } }