Skip to content

Commit ce65175

Browse files
authored
feat(sources): add source latency metric and fix source lag time on large batches (vectordotdev#24987)
* add send latency distribution to source sender * add changelog * use outputMetrics in the builder * fix clippy * apply suggestion * only record the await point duration * make sure metrics are updated if an error is returned * add documentation for the added metrics * fix source_lag_time_seconds measurement for large batches
1 parent 5ba8405 commit ce65175

7 files changed

Lines changed: 122 additions & 28 deletions

File tree

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
Fixed an incorrect source_lag_time_seconds measurement in sources that use `send_batch` with large event batches. When a batch was split into multiple chunks, the reference timestamp used to compute lag time was re-captured on each chunk send, causing the lag time for later chunks to be overstated by the amount of time spent waiting for the channel to accept earlier chunks. The reference timestamp is now captured once before iteration and shared across all chunks.
2+
3+
authors: gwenaskell
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
Sources now record the distribution metrics `source_send_latency_seconds` (measuring the time spent
2+
blocking on a single events chunk send operation on the output) and `source_send_batch_latency_seconds`
3+
(encompassing all chunks within a received events batch).
4+
5+
authors: gwenaskell

lib/vector-core/src/source_sender/builder.rs

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,20 @@
11
use std::{collections::HashMap, time::Duration};
22

3-
use metrics::{Histogram, histogram};
3+
use metrics::histogram;
44
use vector_buffers::topology::channel::LimitedReceiver;
55
use vector_common::internal_event::DEFAULT_OUTPUT;
66

7-
use super::{CHUNK_SIZE, LAG_TIME_NAME, Output, SourceSender, SourceSenderItem};
7+
use super::{
8+
CHUNK_SIZE, LAG_TIME_NAME, Output, OutputMetrics, SEND_BATCH_LATENCY_NAME, SEND_LATENCY_NAME,
9+
SourceSender, SourceSenderItem,
10+
};
811
use crate::config::{ComponentKey, OutputId, SourceOutput};
912

1013
pub struct Builder {
1114
buf_size: usize,
1215
default_output: Option<Output>,
1316
named_outputs: HashMap<String, Output>,
14-
lag_time: Option<Histogram>,
17+
output_metrics: OutputMetrics,
1518
timeout: Option<Duration>,
1619
ewma_half_life_seconds: Option<f64>,
1720
}
@@ -22,7 +25,11 @@ impl Default for Builder {
2225
buf_size: CHUNK_SIZE,
2326
default_output: None,
2427
named_outputs: Default::default(),
25-
lag_time: Some(histogram!(LAG_TIME_NAME)),
28+
output_metrics: OutputMetrics::new(
29+
Some(histogram!(LAG_TIME_NAME)),
30+
Some(histogram!(SEND_LATENCY_NAME)),
31+
Some(histogram!(SEND_BATCH_LATENCY_NAME)),
32+
),
2633
timeout: None,
2734
ewma_half_life_seconds: None,
2835
}
@@ -53,7 +60,6 @@ impl Builder {
5360
output: SourceOutput,
5461
component_key: ComponentKey,
5562
) -> LimitedReceiver<SourceSenderItem> {
56-
let lag_time = self.lag_time.clone();
5763
let log_definition = output.schema_definition.clone();
5864
let output_id = OutputId {
5965
component: component_key,
@@ -64,7 +70,7 @@ impl Builder {
6470
let (output, rx) = Output::new_with_buffer(
6571
self.buf_size,
6672
DEFAULT_OUTPUT.to_owned(),
67-
lag_time,
73+
self.output_metrics.clone(),
6874
log_definition,
6975
output_id,
7076
self.timeout,
@@ -77,7 +83,7 @@ impl Builder {
7783
let (output, rx) = Output::new_with_buffer(
7884
self.buf_size,
7985
name.clone(),
80-
lag_time,
86+
self.output_metrics.clone(),
8187
log_definition,
8288
output_id,
8389
self.timeout,

lib/vector-core/src/source_sender/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ mod tests;
1414

1515
pub use builder::Builder;
1616
pub use errors::SendError;
17-
use output::Output;
17+
use output::{Output, OutputMetrics};
1818
pub use sender::{SourceSender, SourceSenderItem};
1919

2020
pub const CHUNK_SIZE: usize = 1000;
@@ -23,3 +23,5 @@ pub const CHUNK_SIZE: usize = 1000;
2323
const TEST_BUFFER_SIZE: usize = 100;
2424

2525
const LAG_TIME_NAME: &str = "source_lag_time_seconds";
26+
const SEND_LATENCY_NAME: &str = "source_send_latency_seconds";
27+
const SEND_BATCH_LATENCY_NAME: &str = "source_send_batch_latency_seconds";

lib/vector-core/src/source_sender/output.rs

Lines changed: 71 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ impl Drop for UnsentEventCount {
8585
#[derive(Clone)]
8686
pub(super) struct Output {
8787
sender: LimitedSender<SourceSenderItem>,
88-
lag_time: Option<Histogram>,
88+
metrics: OutputMetrics,
8989
events_sent: Registered<EventsSent>,
9090
/// The schema definition that will be attached to Log events sent through here
9191
log_definition: Option<Arc<Definition>>,
@@ -95,6 +95,27 @@ pub(super) struct Output {
9595
timeout: Option<Duration>,
9696
}
9797

98+
#[derive(Clone, Default)]
99+
pub(super) struct OutputMetrics {
100+
lag_time: Option<Histogram>,
101+
send_latency: Option<Histogram>,
102+
send_batch_latency: Option<Histogram>,
103+
}
104+
105+
impl OutputMetrics {
106+
pub(super) fn new(
107+
lag_time: Option<Histogram>,
108+
send_latency: Option<Histogram>,
109+
send_batch_latency: Option<Histogram>,
110+
) -> Self {
111+
Self {
112+
lag_time,
113+
send_latency,
114+
send_batch_latency,
115+
}
116+
}
117+
}
118+
98119
#[expect(clippy::missing_fields_in_debug)]
99120
impl fmt::Debug for Output {
100121
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
@@ -111,19 +132,20 @@ impl Output {
111132
pub(super) fn new_with_buffer(
112133
n: usize,
113134
output: String,
114-
lag_time: Option<Histogram>,
135+
metrics: OutputMetrics,
115136
log_definition: Option<Arc<Definition>>,
116137
output_id: OutputId,
117138
timeout: Option<Duration>,
118139
ewma_half_life_seconds: Option<f64>,
119140
) -> (Self, LimitedReceiver<SourceSenderItem>) {
120141
let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(n).unwrap());
121-
let metrics = ChannelMetricMetadata::new(UTILIZATION_METRIC_PREFIX, Some(output.clone()));
122-
let (tx, rx) = channel::limited(limit, Some(metrics), ewma_half_life_seconds);
142+
let channel_metrics =
143+
ChannelMetricMetadata::new(UTILIZATION_METRIC_PREFIX, Some(output.clone()));
144+
let (tx, rx) = channel::limited(limit, Some(channel_metrics), ewma_half_life_seconds);
123145
(
124146
Self {
125147
sender: tx,
126-
lag_time,
148+
metrics,
127149
events_sent: internal_event::register(EventsSent::from(internal_event::Output(
128150
Some(output.into()),
129151
))),
@@ -136,12 +158,21 @@ impl Output {
136158
}
137159

138160
pub(super) async fn send(
161+
&mut self,
162+
events: EventArray,
163+
unsent_event_count: &mut UnsentEventCount,
164+
) -> Result<(), SendError> {
165+
let reference = Utc::now().timestamp_millis();
166+
self.send_inner(events, unsent_event_count, reference).await
167+
}
168+
169+
async fn send_inner(
139170
&mut self,
140171
mut events: EventArray,
141172
unsent_event_count: &mut UnsentEventCount,
173+
reference: i64,
142174
) -> Result<(), SendError> {
143175
let send_reference = Instant::now();
144-
let reference = Utc::now().timestamp_millis();
145176
events
146177
.iter_events()
147178
.for_each(|event| self.emit_lag_time(event, reference));
@@ -156,7 +187,17 @@ impl Output {
156187

157188
let byte_size = events.estimated_json_encoded_size_of();
158189
let count = events.len();
159-
self.send_with_timeout(events, send_reference).await?;
190+
191+
let send_start = Instant::now();
192+
193+
let send_result = self.send_with_timeout(events, send_reference).await;
194+
195+
if let Some(send_latency) = &self.metrics.send_latency {
196+
send_latency.record(send_start.elapsed().as_secs_f64());
197+
}
198+
199+
send_result?;
200+
160201
self.events_sent.emit(CountByteSize(count, byte_size));
161202
unsent_event_count.decr(count);
162203
Ok(())
@@ -218,33 +259,47 @@ impl Output {
218259
I: IntoIterator<Item = E>,
219260
<I as IntoIterator>::IntoIter: ExactSizeIterator,
220261
{
262+
// Capture a single reference timestamp for the entire batch so that lag time
263+
// measurements are not inflated by channel-send latency for later chunks.
264+
let reference = Utc::now().timestamp_millis();
265+
221266
// It's possible that the caller stops polling this future while it is blocked waiting
222267
// on `self.send()`. When that happens, we use `UnsentEventCount` to correctly emit
223268
// `ComponentEventsDropped` events.
224269
let events = events.into_iter().map(Into::into);
225270
let mut unsent_event_count = UnsentEventCount::new(events.len());
271+
let send_batch_start = Instant::now();
272+
226273
for events in array::events_into_arrays(events, Some(CHUNK_SIZE)) {
227-
self.send(events, &mut unsent_event_count)
274+
self.send_inner(events, &mut unsent_event_count, reference)
228275
.await
229-
.inspect_err(|error| match error {
230-
SendError::Timeout => {
231-
unsent_event_count.timed_out();
276+
.inspect_err(|error| {
277+
match error {
278+
SendError::Timeout => {
279+
unsent_event_count.timed_out();
280+
}
281+
SendError::Closed => {
282+
// The unsent event count is discarded here because the callee emits the
283+
// `StreamClosedError`.
284+
unsent_event_count.discard();
285+
}
232286
}
233-
SendError::Closed => {
234-
// The unsent event count is discarded here because the callee emits the
235-
// `StreamClosedError`.
236-
unsent_event_count.discard();
287+
if let Some(send_batch_latency) = &self.metrics.send_batch_latency {
288+
send_batch_latency.record(send_batch_start.elapsed().as_secs_f64());
237289
}
238290
})?;
239291
}
292+
if let Some(send_batch_latency) = &self.metrics.send_batch_latency {
293+
send_batch_latency.record(send_batch_start.elapsed().as_secs_f64());
294+
}
240295
Ok(())
241296
}
242297

243298
/// Calculate the difference between the reference time and the
244299
/// timestamp stored in the given event reference, and emit the
245300
/// different, as expressed in milliseconds, as a histogram.
246301
pub(super) fn emit_lag_time(&self, event: EventRef<'_>, reference: i64) {
247-
if let Some(lag_time_metric) = &self.lag_time {
302+
if let Some(lag_time_metric) = &self.metrics.lag_time {
248303
let timestamp = match event {
249304
EventRef::Log(log) => {
250305
log_schema()

lib/vector-core/src/source_sender/sender.rs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@ use vector_common::{
2222

2323
use super::{Builder, Output, SendError};
2424
#[cfg(any(test, feature = "test"))]
25-
use super::{LAG_TIME_NAME, TEST_BUFFER_SIZE};
25+
use super::{
26+
LAG_TIME_NAME, OutputMetrics, SEND_BATCH_LATENCY_NAME, SEND_LATENCY_NAME, TEST_BUFFER_SIZE,
27+
};
2628
use crate::{
2729
EstimatedJsonEncodedSizeOf,
2830
event::{Event, EventArray, EventContainer, array::EventArrayIntoIter},
@@ -108,14 +110,16 @@ impl SourceSender {
108110
timeout: Option<Duration>,
109111
) -> (Self, LimitedReceiver<SourceSenderItem>) {
110112
let lag_time = Some(histogram!(LAG_TIME_NAME));
113+
let send_latency = Some(histogram!(SEND_LATENCY_NAME));
114+
let send_batch_latency = Some(histogram!(SEND_BATCH_LATENCY_NAME));
111115
let output_id = OutputId {
112116
component: "test".to_string().into(),
113117
port: None,
114118
};
115119
let (default_output, rx) = Output::new_with_buffer(
116120
n,
117121
DEFAULT_OUTPUT.to_owned(),
118-
lag_time,
122+
OutputMetrics::new(lag_time, send_latency, send_batch_latency),
119123
None,
120124
output_id,
121125
timeout,
@@ -192,8 +196,15 @@ impl SourceSender {
192196
component: "test".to_string().into(),
193197
port: Some(name.clone()),
194198
};
195-
let (output, recv) =
196-
Output::new_with_buffer(100, name.clone(), None, None, output_id, None, None);
199+
let (output, recv) = Output::new_with_buffer(
200+
100,
201+
name.clone(),
202+
OutputMetrics::default(),
203+
None,
204+
output_id,
205+
None,
206+
None,
207+
);
197208
let recv = recv.into_stream().map(move |mut item| {
198209
item.events.iter_events_mut().for_each(|mut event| {
199210
let metadata = event.metadata_mut();

website/cue/reference/components/sources/internal_metrics.cue

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -774,6 +774,18 @@ components: sources: internal_metrics: {
774774
default_namespace: "vector"
775775
tags: _component_tags
776776
}
777+
source_send_batch_latency_seconds: {
778+
description: "The time elapsed blocking on the downstream channel to accept an entire batch of events received at the source"
779+
type: "histogram"
780+
default_namespace: "vector"
781+
tags: _component_tags
782+
}
783+
source_send_latency_seconds: {
784+
description: "The time elapsed blocking on the downstream channel to accept a single chunk from a batch of events received at the source"
785+
type: "histogram"
786+
default_namespace: "vector"
787+
tags: _component_tags
788+
}
777789
source_buffer_max_byte_size: {
778790
description: "The maximum number of bytes the source buffer can hold. The outputs of the source send data to this buffer."
779791
type: "gauge"

0 commit comments

Comments
 (0)