Skip to content

Commit dc9d8de

Browse files
vladimir-ddclaude
andauthored
chore(datadog_metrics sink): switch to v2 endpoint (#24842)
* chore(datadog_metrics sink): switch to Datadog Metrics V2 endpoint * chore(regression): Add statsd_to_datadog_metrics performance test Add regression test to validate datadog_metrics sink v2 endpoint performance under realistic high-throughput DogStatsD load. Test Configuration: - Load: Default lading dogstatsd settings (realistic ~2KB messages) - Throughput: 500 Mb/s → ~250k events/sec - Batch: Default settings (100k max_events, 2s timeout) - Validates batch splitting when payloads exceed v2 size limits This test ensures v2 endpoint correctly handles batch splitting with realistic high-cardinality DogStatsD metrics under load. Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com> * fix(datadog_metrics sink): apply per-endpoint batch size limit Different series endpoints have different uncompressed payload limits (v2 is 12x smaller than v1). This ensures each batch fits in a single HTTP request without splitting, reducing memory overhead. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(datadog_metrics sink): fix test compilation errors in encoder and config - Remove references to DatadogMetricsCompression and request_compression from encoder.rs tests (those symbols don't exist in current codebase; they belong to an unmerged compression-options branch) - Fix batcher_user_max_bytes_is_preserved test to avoid struct update syntax with private PhantomData fields in BatchConfig Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * chore(datadog_metrics sink): warn on deprecated VECTOR_TEMP_USE_DD_METRICS_SERIES_V2_API env var The old opt-in env var is now a no-op since v2 is the default. Emit a one-time warning so existing users know they can safely remove it. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * trigger build * feat(datadog_metrics sink): use separate batch size limits for Series and Sketches Series v2 has a 5 MiB uncompressed payload limit while Sketches allows 60 MiB. Previously both used the same (Series-derived) cap, which over-fragmented sketch-heavy workloads into many small requests. To support per-partition batch configuration, `PartitionedBatcher` now passes the partition key to the batch config factory closure (`Fn(&Key) -> C` instead of `Fn() -> C`). The explicit `timeout: Duration` parameter replaces the previous extraction via `settings().timeout()`. All existing callers are updated mechanically. The `datadog_metrics` sink uses the new capability to select the appropriate byte size limit per endpoint partition, keeping Series at 5 MiB and Sketches at 60 MiB. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(datadog_metrics sink): fix CI failures after partitioned batcher refactor - Add `Batch = B` constraint to `with_timer` test constructor so the compiler can infer `B` from the batch config type - Replace `Box::new(move |_| ...)` with unboxed `move |_: &u8| ...` in tests to avoid HRTB inference issues with `Fn(&Key) -> C` - Run `make fmt` to fix formatting in several sink files Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * feat(datadog_metrics sink): add series_api_version config field, remove env var support - Add `series_api_version` config option (`v1` | `v2`, default `v2`) to `DatadogMetricsConfig` - Make `SeriesApiVersion` a proper `#[configurable_component]` enum with serde support - Remove `VECTOR_TEMP_USE_DD_METRICS_SERIES_V1_API` and `VECTOR_TEMP_USE_DD_METRICS_SERIES_V2_API` env vars - Remove `get_api_version_backwards_compatible()` and `DatadogMetricsEndpoint::series()` helpers - Thread `series_api_version` through config, sink, partitioner, and request builder - Add `v1_batch_config_uses_v1_size_limit` test; update existing tests to pass explicit version Rationale: Replace the hidden, process-global env var mechanism with a standard config field so users can control the series API version per-sink without a full Vector restart, and so the option is properly documented alongside other sink settings. * update docs * mark v1 as deprecated * fix(datadog_metrics sink): fix gauge interval assertion after normalization strips interval_ms `into_absolute()` always sets `interval_ms: None`, so the encoded interval for gauges is always 0. The test expectation of 10 was dead code until v2 became the default endpoint. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * test(datadog_metrics sink): validate both v1 and v2 series endpoints in e2e tests - Add series_api_version matrix dimension ['v1', 'v2'] to test.yaml, expanding coverage from 3 to 6 environments - Pass CONFIG_SERIES_API_VERSION through docker compose environment to the Vector container - Parameterize series_api_version in vector.toml from the env var - Refactor series::validate() to dispatch to v1 or v2 fetch function based on CONFIG_SERIES_API_VERSION - Extract assertions into compare_intakes() helper (no new assertion logic) - Replace blocking std::thread::sleep with async tokio::time::sleep in the test entry point Rationale: Vector now supports configurable series_api_version (v1/v2) in the datadog_metrics sink. The e2e test previously hardcoded v1 for the vector pipeline. This change runs the same assertions against both API versions via the CI matrix, ensuring correctness is validated for each version independently without duplicating test logic. --------- Co-authored-by: Claude Sonnet 4.5 <noreply@anthropic.com>
1 parent 83bcf4c commit dc9d8de

32 files changed

Lines changed: 382 additions & 98 deletions

File tree

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
The `datadog_metrics` sink now defaults to the Datadog series v2 endpoint (`/api/v2/series`) and
2+
exposes a new `series_api_version` configuration option (`v1` or `v2`) to control which endpoint is
3+
used. Set `series_api_version: v1` to fall back to the legacy v1 endpoint if needed.
4+
5+
authors: vladimir-dd

lib/vector-stream/src/partitioned_batcher.rs

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -211,10 +211,9 @@ where
211211
Prt::Key: Eq + Hash + Clone,
212212
Prt::Item: ByteSizeOf,
213213
C: BatchConfig<Prt::Item>,
214-
F: Fn() -> C + Send,
214+
F: Fn(&Prt::Key) -> C + Send,
215215
{
216-
pub fn new(stream: St, partitioner: Prt, settings: F) -> Self {
217-
let timeout = settings().timeout();
216+
pub fn new(stream: St, partitioner: Prt, timeout: Duration, settings: F) -> Self {
218217
Self {
219218
state: settings,
220219
batches: HashMap::default(),
@@ -233,8 +232,8 @@ where
233232
Prt: Partitioner + Unpin,
234233
Prt::Key: Eq + Hash + Clone,
235234
Prt::Item: ByteSizeOf,
236-
C: BatchConfig<Prt::Item>,
237-
F: Fn() -> C + Send,
235+
C: BatchConfig<Prt::Item, Batch = B>,
236+
F: Fn(&Prt::Key) -> C + Send,
238237
{
239238
pub fn with_timer(stream: St, partitioner: Prt, timer: KT, settings: F) -> Self {
240239
Self {
@@ -256,7 +255,7 @@ where
256255
Prt::Item: ByteSizeOf,
257256
KT: KeyedTimer<Prt::Key>,
258257
C: BatchConfig<Prt::Item, Batch = B>,
259-
F: Fn() -> C + Send,
258+
F: Fn(&Prt::Key) -> C + Send,
260259
{
261260
type Item = (Prt::Key, B);
262261

@@ -307,7 +306,7 @@ where
307306
let batch = if let Some(batch) = this.batches.get_mut(&item_key) {
308307
batch
309308
} else {
310-
let batch = (this.state)();
309+
let batch = (this.state)(&item_key);
311310
this.batches.insert(item_key.clone(), batch);
312311
this.timer.insert(item_key.clone());
313312
this.batches
@@ -479,7 +478,7 @@ mod test {
479478
let batch_settings = BatcherSettings::new(Duration::from_secs(1), allocation_limit, item_limit);
480479

481480
let batcher = PartitionedBatcher::with_timer(&mut stream, partitioner, timer,
482-
Box::new(move || batch_settings.as_byte_size_config()));
481+
move |_: &u8| batch_settings.as_byte_size_config::<u64>());
483482
let batcher_size_hint = batcher.size_hint();
484483

485484
assert_eq!(stream_size_hint, batcher_size_hint);
@@ -503,7 +502,7 @@ mod test {
503502
let allocation_limit = NonZeroUsize::new(allocation_limit as usize).unwrap();
504503
let batch_settings = BatcherSettings::new(Duration::from_secs(1), allocation_limit, item_limit);
505504
let mut batcher = PartitionedBatcher::with_timer(&mut stream, partitioner,
506-
timer, Box::new(move || batch_settings.as_byte_size_config()));
505+
timer, move |_: &u8| batch_settings.as_byte_size_config::<u64>());
507506
let mut batcher = Pin::new(&mut batcher);
508507

509508
loop {
@@ -574,7 +573,7 @@ mod test {
574573
let allocation_limit = NonZeroUsize::new(allocation_limit as usize).unwrap();
575574
let batch_settings = BatcherSettings::new(Duration::from_secs(1), allocation_limit, item_limit);
576575
let mut batcher = PartitionedBatcher::with_timer(&mut stream, partitioner,
577-
timer, Box::new(move || batch_settings.as_byte_size_config()));
576+
timer, move |_: &u8| batch_settings.as_byte_size_config::<u64>());
578577
let mut batcher = Pin::new(&mut batcher);
579578

580579
loop {
@@ -618,7 +617,7 @@ mod test {
618617
let allocation_limit = NonZeroUsize::new(allocation_limit as usize).unwrap();
619618
let batch_settings = BatcherSettings::new(Duration::from_secs(1), allocation_limit, item_limit);
620619
let mut batcher = PartitionedBatcher::with_timer(&mut stream, partitioner,
621-
timer, Box::new(move || batch_settings.as_byte_size_config()));
620+
timer, move |_: &u8| batch_settings.as_byte_size_config::<u64>());
622621
let mut batcher = Pin::new(&mut batcher);
623622

624623
let mut observed_items = 0;
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
optimization_goal: egress_throughput
2+
3+
target:
4+
name: vector
5+
command: /usr/bin/vector
6+
cpu_allotment: 6
7+
memory_allotment: 8GiB
8+
9+
environment:
10+
VECTOR_THREADS: 4
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
generator:
2+
- tcp:
3+
seed: [2, 3, 5, 7, 11, 13, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, 97, 101, 103, 107, 109, 113, 127, 131, 137]
4+
addr: "0.0.0.0:8125"
5+
variant:
6+
dogstatsd: {}
7+
bytes_per_second: "500 Mb"
8+
maximum_prebuild_cache_size_bytes: "256 Mb"
9+
10+
blackhole:
11+
- http:
12+
binding_addr: "0.0.0.0:8080"
13+
14+
target_metrics:
15+
- prometheus:
16+
uri: "http://127.0.0.1:9090/metrics"
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
data_dir: "/var/lib/vector"
2+
3+
sources:
4+
internal_metrics:
5+
type: "internal_metrics"
6+
7+
statsd:
8+
type: "statsd"
9+
address: "0.0.0.0:8125"
10+
mode: "tcp"
11+
12+
sinks:
13+
prometheus:
14+
type: "prometheus_exporter"
15+
inputs: ["internal_metrics"]
16+
address: "0.0.0.0:9090"
17+
18+
datadog_metrics:
19+
type: "datadog_metrics"
20+
inputs: ["statsd"]
21+
endpoint: "http://0.0.0.0:8080"
22+
default_api_key: "DEADBEEF"
23+
default_namespace: "vector"
24+
healthcheck:
25+
enabled: false

src/sinks/aws_cloudwatch_logs/sink.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ where
4949
let age_range = start..end;
5050
future::ready(age_range.contains(&req.timestamp))
5151
})
52-
.batched_partitioned(CloudwatchPartitioner, || {
52+
.batched_partitioned(CloudwatchPartitioner, batcher_settings.timeout, |_| {
5353
batcher_settings.as_byte_size_config()
5454
})
5555
.map(|(key, events)| {

src/sinks/azure_common/sink.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,9 @@ where
4747
let request_builder = self.request_builder;
4848

4949
input
50-
.batched_partitioned(partitioner, || settings.as_byte_size_config())
50+
.batched_partitioned(partitioner, settings.timeout, |_| {
51+
settings.as_byte_size_config()
52+
})
5153
.filter_map(|(key, batch)| async move {
5254
// We don't need to emit an error here if the event is dropped since this will occur if the template
5355
// couldn't be rendered during the partitioning. A `TemplateRenderingError` is already emitted when

src/sinks/clickhouse/sink.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@ where
4343
input
4444
.batched_partitioned(
4545
KeyPartitioner::new(self.database, self.table, self.format),
46-
|| batch_settings.as_byte_size_config(),
46+
batch_settings.timeout,
47+
|_| batch_settings.as_byte_size_config(),
4748
)
4849
.filter_map(|(key, batch)| async move { key.map(move |k| (k, batch)) })
4950
.request_builder(

src/sinks/datadog/logs/sink.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -386,7 +386,7 @@ where
386386
conforms_as_agent: self.conforms_as_agent,
387387
});
388388

389-
let input = input.batched_partitioned(partitioner, || {
389+
let input = input.batched_partitioned(partitioner, batch_settings.timeout, |_| {
390390
batch_settings.as_item_size_config(HttpJsonBatchSizer)
391391
});
392392
input

src/sinks/datadog/metrics/config.rs

Lines changed: 91 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
use std::sync::OnceLock;
2-
31
use http::Uri;
42
use snafu::ResultExt;
53
use tower::ServiceBuilder;
6-
use vector_lib::{config::proxy::ProxyConfig, configurable::configurable_component};
4+
use vector_lib::{
5+
config::proxy::ProxyConfig, configurable::configurable_component, stream::BatcherSettings,
6+
};
77

88
use super::{
99
request_builder::DatadogMetricsRequestBuilder,
@@ -21,18 +21,13 @@ use crate::{
2121
},
2222
tls::{MaybeTlsSettings, TlsEnableableConfig},
2323
};
24-
2524
#[derive(Clone, Copy, Debug, Default)]
2625
pub struct DatadogMetricsDefaultBatchSettings;
2726

28-
// This default is centered around "series" data, which should be the lion's share of what we
29-
// process. Given that a single series, when encoded, is in the 150-300 byte range, we can fit a
30-
// lot of these into a single request, something like 150-200K series. Simply to be a little more
31-
// conservative, though, we use 100K here. This will also get a little more tricky when it comes to
32-
// distributions and sketches, but we're going to have to implement incremental encoding to handle
33-
// "we've exceeded our maximum payload size, split this batch" scenarios anyways.
3427
impl SinkBatchSettings for DatadogMetricsDefaultBatchSettings {
3528
const MAX_EVENTS: Option<usize> = Some(100_000);
29+
// No default byte cap here; the appropriate limit (v1: 60 MiB, v2: 5 MiB) is applied at
30+
// sink build time based on the active series API version.
3631
const MAX_BYTES: Option<usize> = None;
3732
const TIMEOUT_SECS: f64 = 2.0;
3833
}
@@ -41,9 +36,21 @@ pub(super) const SERIES_V1_PATH: &str = "/api/v1/series";
4136
pub(super) const SERIES_V2_PATH: &str = "/api/v2/series";
4237
pub(super) const SKETCHES_PATH: &str = "/api/beta/sketches";
4338

44-
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
39+
/// The API version to use when submitting series metrics to Datadog.
40+
#[configurable_component]
41+
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash)]
42+
#[serde(rename_all = "snake_case")]
4543
pub enum SeriesApiVersion {
44+
/// Use the v1 series endpoint (`/api/v1/series`).
45+
///
46+
/// This is a legacy endpoint. Prefer `v2` unless you have a specific reason to use v1.
47+
#[configurable(deprecated)]
4648
V1,
49+
50+
/// Use the v2 series endpoint (`/api/v2/series`).
51+
///
52+
/// This is the recommended and default endpoint.
53+
#[default]
4754
V2,
4855
}
4956

@@ -54,15 +61,6 @@ impl SeriesApiVersion {
5461
Self::V2 => SERIES_V2_PATH,
5562
}
5663
}
57-
fn get_api_version() -> Self {
58-
static API_VERSION: OnceLock<SeriesApiVersion> = OnceLock::new();
59-
*API_VERSION.get_or_init(|| {
60-
match std::env::var("VECTOR_TEMP_USE_DD_METRICS_SERIES_V2_API") {
61-
Ok(_) => Self::V2,
62-
Err(_) => Self::V1,
63-
}
64-
})
65-
}
6664
}
6765

6866
/// Various metric type-specific API types.
@@ -94,14 +92,8 @@ impl DatadogMetricsEndpoint {
9492
matches!(self, Self::Series { .. })
9593
}
9694

97-
// Creates an instance of the `Series` variant with the default API version.
98-
pub fn series() -> Self {
99-
Self::Series(SeriesApiVersion::get_api_version())
100-
}
101-
10295
pub(super) const fn payload_limits(self) -> DatadogMetricsPayloadLimits {
10396
// from https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
104-
10597
let (uncompressed, compressed) = match self {
10698
// Sketches use the same payload size limits as v1 series
10799
DatadogMetricsEndpoint::Series(SeriesApiVersion::V1)
@@ -162,6 +154,13 @@ pub struct DatadogMetricsConfig {
162154
#[serde(default)]
163155
pub default_namespace: Option<String>,
164156

157+
/// Controls which Datadog series API endpoint is used to submit metrics.
158+
///
159+
/// Defaults to `v2` (`/api/v2/series`). Set to `v1` (`/api/v1/series`) only if you need to
160+
/// fall back to the legacy endpoint.
161+
#[serde(default)]
162+
pub series_api_version: SeriesApiVersion,
163+
165164
#[configurable(derived)]
166165
#[serde(default)]
167166
pub batch: BatchConfig<DatadogMetricsDefaultBatchSettings>,
@@ -222,7 +221,7 @@ impl DatadogMetricsConfig {
222221
) -> crate::Result<DatadogMetricsEndpointConfiguration> {
223222
let base_uri = self.get_base_agent_endpoint(dd_common);
224223

225-
let series_endpoint = build_uri(&base_uri, SeriesApiVersion::get_api_version().get_path())?;
224+
let series_endpoint = build_uri(&base_uri, self.series_api_version.get_path())?;
226225
let sketches_endpoint = build_uri(&base_uri, SKETCHES_PATH)?;
227226

228227
Ok(DatadogMetricsEndpointConfiguration::new(
@@ -253,7 +252,8 @@ impl DatadogMetricsConfig {
253252
dd_common: &DatadogCommonConfig,
254253
client: HttpClient,
255254
) -> crate::Result<VectorSink> {
256-
let batcher_settings = self.batch.into_batcher_settings()?;
255+
let (batcher_settings, sketches_batcher_settings) =
256+
resolve_endpoint_batch_settings(self.batch, self.series_api_version)?;
257257

258258
// TODO: revisit our concurrency and batching defaults
259259
let request_limits = self.request.into_settings();
@@ -269,10 +269,18 @@ impl DatadogMetricsConfig {
269269
let request_builder = DatadogMetricsRequestBuilder::new(
270270
endpoint_configuration,
271271
self.default_namespace.clone(),
272+
self.series_api_version,
272273
)?;
273274

274275
let protocol = self.get_protocol(dd_common);
275-
let sink = DatadogMetricsSink::new(service, request_builder, batcher_settings, protocol);
276+
let sink = DatadogMetricsSink::new(
277+
service,
278+
request_builder,
279+
batcher_settings,
280+
sketches_batcher_settings,
281+
protocol,
282+
self.series_api_version,
283+
);
276284

277285
Ok(VectorSink::from_event_streamsink(sink))
278286
}
@@ -287,6 +295,28 @@ impl DatadogMetricsConfig {
287295
}
288296
}
289297

298+
/// Returns `(series_settings, sketches_settings)`.
299+
///
300+
/// When the user has not set an explicit `max_bytes`, each endpoint is capped to its own
301+
/// uncompressed payload limit (5 MiB for Series v2, 60 MiB for Sketches). When an explicit
302+
/// limit is configured, both endpoints share it.
303+
fn resolve_endpoint_batch_settings(
304+
batch: BatchConfig<DatadogMetricsDefaultBatchSettings>,
305+
series_version: SeriesApiVersion,
306+
) -> crate::Result<(BatcherSettings, BatcherSettings)> {
307+
let mut series = batch.into_batcher_settings()?;
308+
let mut sketches = series;
309+
if series.size_limit == usize::MAX {
310+
series.size_limit = DatadogMetricsEndpoint::Series(series_version)
311+
.payload_limits()
312+
.uncompressed;
313+
sketches.size_limit = DatadogMetricsEndpoint::Sketches
314+
.payload_limits()
315+
.uncompressed;
316+
}
317+
Ok((series, sketches))
318+
}
319+
290320
fn build_uri(host: &str, endpoint: &str) -> crate::Result<Uri> {
291321
let result = format!("{host}{endpoint}")
292322
.parse::<Uri>()
@@ -302,4 +332,36 @@ mod tests {
302332
fn generate_config() {
303333
crate::test_util::test_generate_config::<DatadogMetricsConfig>();
304334
}
335+
336+
// When max_bytes is unset, each endpoint gets its own API payload limit.
337+
#[test]
338+
fn default_batch_config_uses_endpoint_specific_size_limits() {
339+
let (series, sketches) =
340+
resolve_endpoint_batch_settings(BatchConfig::default(), SeriesApiVersion::V2).unwrap();
341+
342+
assert_eq!(series.size_limit, 5_242_880); // 5 MiB — Series v2 limit
343+
assert_eq!(sketches.size_limit, 62_914_560); // 60 MiB — Sketches limit
344+
}
345+
346+
#[test]
347+
fn v1_batch_config_uses_v1_size_limit() {
348+
let (series, sketches) =
349+
resolve_endpoint_batch_settings(BatchConfig::default(), SeriesApiVersion::V1).unwrap();
350+
351+
assert_eq!(series.size_limit, 62_914_560); // 60 MiB — Series v1 limit
352+
assert_eq!(sketches.size_limit, 62_914_560); // 60 MiB — Sketches limit
353+
}
354+
355+
// When the user sets max_bytes, both endpoints share that limit unchanged.
356+
#[test]
357+
fn explicit_max_bytes_applies_to_both_endpoints() {
358+
let mut config = BatchConfig::<DatadogMetricsDefaultBatchSettings>::default();
359+
config.max_bytes = Some(1_000_000);
360+
361+
let (series, sketches) =
362+
resolve_endpoint_batch_settings(config, SeriesApiVersion::V2).unwrap();
363+
364+
assert_eq!(series.size_limit, 1_000_000);
365+
assert_eq!(sketches.size_limit, 1_000_000);
366+
}
305367
}

0 commit comments

Comments
 (0)