Skip to content

Commit 517371f

Browse files
committed
fix(aggregate): Fix incorrectly dropped events bug in metrics aggregate processor
1 parent c5959d7 commit 517371f

2 files changed

Lines changed: 138 additions & 66 deletions

File tree

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
The `aggregate` transform now correctly passes through metrics whose kind is not supported by the configured mode, rather than silently dropping them. For example, `absolute` metrics flowing through a `sum`-mode aggregate are forwarded to the output unchanged without any aggregation & without being dropped
2+
3+
authors: ArunPiduguDD

src/transforms/aggregate.rs

Lines changed: 135 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,14 @@ impl TransformConfig for AggregateConfig {
165165

166166
type MetricEntry = (MetricData, EventMetadata);
167167

168+
// Never stored in a collection, only ephemeral/transient to forward ignored events
169+
#[allow(clippy::large_enum_variant)]
170+
#[derive(Debug)]
171+
pub enum RecordOutcome {
172+
Aggregated,
173+
Passthrough(Event),
174+
}
175+
168176
#[derive(Debug)]
169177
pub struct Aggregate {
170178
interval: Duration,
@@ -181,9 +189,25 @@ impl Aggregate {
181189
})
182190
}
183191

184-
pub fn record(&mut self, event: Event) {
185-
let (series, data, metadata) = event.into_metric().into_parts();
192+
pub fn record(&mut self, event: Event) -> RecordOutcome {
193+
let kind = event.as_metric().kind();
194+
if matches!(
195+
(&self.mode, kind),
196+
(InnerMode::Sum, MetricKind::Absolute)
197+
| (
198+
InnerMode::Latest | InnerMode::Diff { .. },
199+
MetricKind::Incremental
200+
)
201+
| (InnerMode::Max | InnerMode::Min, MetricKind::Incremental)
202+
| (
203+
InnerMode::Mean { .. } | InnerMode::Stdev { .. },
204+
MetricKind::Incremental
205+
)
206+
) {
207+
return RecordOutcome::Passthrough(event);
208+
}
186209

210+
let (series, data, metadata) = event.into_metric().into_parts();
187211
match &mut self.mode {
188212
InnerMode::Auto => match data.kind {
189213
MetricKind::Incremental => self.record_sum(series, data, metadata),
@@ -192,33 +216,30 @@ impl Aggregate {
192216
}
193217
},
194218
InnerMode::Sum => self.record_sum(series, data, metadata),
195-
InnerMode::Latest | InnerMode::Diff { .. } => match data.kind {
196-
MetricKind::Incremental => (),
197-
MetricKind::Absolute => {
198-
self.map.insert(series, (data, metadata));
199-
}
200-
},
219+
InnerMode::Latest | InnerMode::Diff { .. } => {
220+
self.map.insert(series, (data, metadata));
221+
}
201222
InnerMode::Count => self.record_count(series, data, metadata),
202223
InnerMode::Max | InnerMode::Min => self.record_comparison(series, data, metadata),
203-
InnerMode::Mean { multi_map } | InnerMode::Stdev { multi_map } => match data.kind {
204-
MetricKind::Incremental => (),
205-
MetricKind::Absolute => {
206-
if matches!(data.value, MetricValue::Gauge { value: _ }) {
207-
match multi_map.entry(series) {
208-
Entry::Occupied(mut entry) => {
209-
let existing = entry.get_mut();
210-
existing.push((data, metadata));
211-
}
212-
Entry::Vacant(entry) => {
213-
entry.insert(vec![(data, metadata)]);
214-
}
224+
InnerMode::Mean { multi_map } | InnerMode::Stdev { multi_map } => {
225+
if matches!(data.value, MetricValue::Gauge { value: _ }) {
226+
match multi_map.entry(series) {
227+
Entry::Occupied(mut entry) => entry.get_mut().push((data, metadata)),
228+
Entry::Vacant(entry) => {
229+
entry.insert(vec![(data, metadata)]);
215230
}
216231
}
217232
}
218-
},
233+
}
219234
}
220-
221235
emit!(AggregateEventRecorded);
236+
RecordOutcome::Aggregated
237+
}
238+
239+
pub fn record_into(&mut self, event: Event, output: &mut Vec<Event>) {
240+
if let RecordOutcome::Passthrough(event) = self.record(event) {
241+
output.push(event);
242+
}
222243
}
223244

224245
fn record_count(
@@ -241,23 +262,20 @@ impl Aggregate {
241262
}
242263

243264
fn record_sum(&mut self, series: MetricSeries, data: MetricData, metadata: EventMetadata) {
244-
match data.kind {
245-
MetricKind::Incremental => match self.map.entry(series) {
246-
Entry::Occupied(mut entry) => {
247-
let existing = entry.get_mut();
248-
// In order to update (add) the new and old kind's must match
249-
if existing.0.kind == data.kind && existing.0.update(&data) {
250-
existing.1.merge(metadata);
251-
} else {
252-
emit!(AggregateUpdateFailed);
253-
*existing = (data, metadata);
254-
}
255-
}
256-
Entry::Vacant(entry) => {
257-
entry.insert((data, metadata));
265+
match self.map.entry(series) {
266+
Entry::Occupied(mut entry) => {
267+
let existing = entry.get_mut();
268+
// In order to update (add) the new and old kind's must match
269+
if existing.0.kind == data.kind && existing.0.update(&data) {
270+
existing.1.merge(metadata);
271+
} else {
272+
emit!(AggregateUpdateFailed);
273+
*existing = (data, metadata);
258274
}
259-
},
260-
MetricKind::Absolute => {}
275+
}
276+
Entry::Vacant(entry) => {
277+
entry.insert((data, metadata));
278+
}
261279
}
262280
}
263281

@@ -267,36 +285,33 @@ impl Aggregate {
267285
data: MetricData,
268286
metadata: EventMetadata,
269287
) {
270-
match data.kind {
271-
MetricKind::Incremental => (),
272-
MetricKind::Absolute => match self.map.entry(series) {
273-
Entry::Occupied(mut entry) => {
274-
let existing = entry.get_mut();
275-
// In order to update (add) the new and old kind's must match
276-
if existing.0.kind == data.kind {
277-
if let MetricValue::Gauge {
278-
value: existing_value,
279-
} = existing.0.value()
280-
&& let MetricValue::Gauge { value: new_value } = data.value()
281-
{
282-
let should_update = match self.mode {
283-
InnerMode::Max => new_value > existing_value,
284-
InnerMode::Min => new_value < existing_value,
285-
_ => false,
286-
};
287-
if should_update {
288-
*existing = (data, metadata);
289-
}
288+
match self.map.entry(series) {
289+
Entry::Occupied(mut entry) => {
290+
let existing = entry.get_mut();
291+
// In order to update (add) the new and old kind's must match
292+
if existing.0.kind == data.kind {
293+
if let MetricValue::Gauge {
294+
value: existing_value,
295+
} = existing.0.value()
296+
&& let MetricValue::Gauge { value: new_value } = data.value()
297+
{
298+
let should_update = match self.mode {
299+
InnerMode::Max => new_value > existing_value,
300+
InnerMode::Min => new_value < existing_value,
301+
_ => false,
302+
};
303+
if should_update {
304+
*existing = (data, metadata);
290305
}
291-
} else {
292-
emit!(AggregateUpdateFailed);
293-
*existing = (data, metadata);
294306
}
307+
} else {
308+
emit!(AggregateUpdateFailed);
309+
*existing = (data, metadata);
295310
}
296-
Entry::Vacant(entry) => {
297-
entry.insert((data, metadata));
298-
}
299-
},
311+
}
312+
Entry::Vacant(entry) => {
313+
entry.insert((data, metadata));
314+
}
300315
}
301316
}
302317

@@ -405,7 +420,7 @@ impl TaskTransform<Event> for Aggregate {
405420
self.flush_into(&mut output);
406421
done = true;
407422
}
408-
Some(event) => self.record(event),
423+
Some(event) => self.record_into(event, &mut output),
409424
}
410425
}
411426
};
@@ -949,6 +964,60 @@ mod tests {
949964
assert_eq!(&stdev_result, &out[0]);
950965
}
951966

967+
#[test]
968+
fn passes_through_ignored_kind() {
969+
// Sum mode aggregates incremental, passes through absolute without collapsing.
970+
let mut agg = Aggregate::new(&AggregateConfig {
971+
interval_ms: 1000_u64,
972+
mode: AggregationMode::Sum,
973+
})
974+
.unwrap();
975+
976+
let counter_1 = make_metric(
977+
"counter_a",
978+
MetricKind::Incremental,
979+
MetricValue::Counter { value: 10.0 },
980+
);
981+
let counter_2 = make_metric(
982+
"counter_a",
983+
MetricKind::Incremental,
984+
MetricValue::Counter { value: 5.0 },
985+
);
986+
let counter_summed = make_metric(
987+
"counter_a",
988+
MetricKind::Incremental,
989+
MetricValue::Counter { value: 15.0 },
990+
);
991+
let gauge_1 = make_metric(
992+
"gauge_a",
993+
MetricKind::Absolute,
994+
MetricValue::Gauge { value: 42.0 },
995+
);
996+
let gauge_2 = make_metric(
997+
"gauge_a",
998+
MetricKind::Absolute,
999+
MetricValue::Gauge { value: 99.0 },
1000+
);
1001+
1002+
// Absolute metrics pass through immediately (not held until flush).
1003+
assert!(
1004+
matches!(agg.record(gauge_1.clone()), RecordOutcome::Passthrough(e) if e == gauge_1)
1005+
);
1006+
assert!(
1007+
matches!(agg.record(gauge_2.clone()), RecordOutcome::Passthrough(e) if e == gauge_2)
1008+
);
1009+
1010+
// Each is returned individually — no collapsing to latest.
1011+
agg.record(counter_1);
1012+
agg.record(counter_2);
1013+
1014+
let mut out = vec![];
1015+
agg.flush_into(&mut out);
1016+
// Only the summed incremental counter appears at flush; the gauges already passed through.
1017+
assert_eq!(1, out.len());
1018+
assert_eq!(&counter_summed, &out[0]);
1019+
}
1020+
9521021
#[test]
9531022
fn conflicting_value_type() {
9541023
let mut agg = Aggregate::new(&AggregateConfig {

0 commit comments

Comments
 (0)