Skip to content

Commit 5ef4ab5

Browse files
authored
[query-engine] Check schema when processing summaries back into logs in OTLP RecordSet bridge (open-telemetry#2441)
# Changes * Check schema when converting summary records into log records in OTLP RecordSet bridge # Details @drewrelmas noticed if you do something like `"source | summarize Count = count() by severity_text` or `"source | summarize Count = count() | extend body = 'Summary record'` the data for `severity_text` or `body` ends up on `Attributes`. This PR makes the summary code smarter to detect when top-level things are set on a summary.
1 parent 5e0c424 commit 5ef4ab5

1 file changed

Lines changed: 237 additions & 29 deletions

File tree

  • rust/experimental/query_engine/engine-recordset-otlp-bridge/src

rust/experimental/query_engine/engine-recordset-otlp-bridge/src/bridge.rs

Lines changed: 237 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -301,69 +301,73 @@ pub fn process_export_logs_service_request_using_pipeline(
301301
}
302302

303303
if has_summaries {
304+
let schema = get_log_record_schema();
304305
let mut log_records = Vec::new();
305306

306307
for summary in final_results.summaries.included_summaries {
307308
let diagnostics = summary.diagnostics;
308309

309310
let mut log_record = if let Some(map) = summary.map {
310-
let mut attributes: Vec<(Box<str>, AnyValue)> =
311-
Vec::with_capacity(map.len() + 1);
311+
let mut log_record = LogRecord::new();
312312

313-
attributes.extend(
314-
map.take_values()
315-
.drain()
316-
.map(|(key, value)| (key, value.into())),
317-
);
313+
let mut attributes: Vec<(Box<str>, AnyValue)> = Vec::with_capacity(map.len());
314+
315+
for (key, value) in map.take_values().drain() {
316+
push_value(schema, &mut log_record, &mut attributes, key, value);
317+
}
318318

319-
LogRecord::new().with_attributes(attributes)
319+
log_record.with_attributes(attributes)
320320
} else {
321+
let mut log_record = LogRecord::new();
322+
321323
let mut attributes: Vec<(Box<str>, AnyValue)> = Vec::with_capacity(
322-
summary.aggregation_values.len() + summary.group_by_values.len() + 1,
324+
summary.aggregation_values.len() + summary.group_by_values.len(),
323325
);
324326

325327
for (key, value) in summary.group_by_values {
326-
attributes.push((key, value.into()));
328+
push_value(schema, &mut log_record, &mut attributes, key, value);
327329
}
328330

329331
for (key, value) in summary.aggregation_values {
330332
match value {
331333
SummaryAggregation::Average { count, sum } => {
332334
let avg = sum.to_double() / count as f64;
333335

334-
attributes.push((
336+
push_value(
337+
schema,
338+
&mut log_record,
339+
&mut attributes,
335340
key,
336-
AnyValue::Native(OtlpAnyValue::DoubleValue(
337-
DoubleValueStorage::new(avg),
338-
)),
339-
));
341+
OwnedValue::Double(DoubleValueStorage::new(avg)),
342+
);
340343
}
341344
SummaryAggregation::Count(v) => {
342-
attributes.push((
345+
push_value(
346+
schema,
347+
&mut log_record,
348+
&mut attributes,
343349
key,
344-
AnyValue::Native(OtlpAnyValue::IntValue(
345-
IntegerValueStorage::new(v as i64),
346-
)),
347-
));
350+
OwnedValue::Integer(IntegerValueStorage::new(v as i64)),
351+
);
348352
}
349353
SummaryAggregation::Maximum(v) | SummaryAggregation::Minimum(v) => {
350-
attributes.push((key, v.into()));
354+
push_value(schema, &mut log_record, &mut attributes, key, v);
351355
}
352356
SummaryAggregation::Sum(v) => {
353357
let v = match v {
354-
SummaryValue::Double(d) => AnyValue::Native(
355-
OtlpAnyValue::DoubleValue(DoubleValueStorage::new(d)),
356-
),
357-
SummaryValue::Integer(i) => AnyValue::Native(
358-
OtlpAnyValue::IntValue(IntegerValueStorage::new(i)),
359-
),
358+
SummaryValue::Double(d) => {
359+
OwnedValue::Double(DoubleValueStorage::new(d))
360+
}
361+
SummaryValue::Integer(i) => {
362+
OwnedValue::Integer(IntegerValueStorage::new(i))
363+
}
360364
};
361-
attributes.push((key, v));
365+
push_value(schema, &mut log_record, &mut attributes, key, v);
362366
}
363367
}
364368
}
365369

366-
LogRecord::new().with_attributes(attributes)
370+
log_record.with_attributes(attributes)
367371
};
368372

369373
handle_diagnostics(
@@ -403,6 +407,23 @@ pub fn process_export_logs_service_request_using_pipeline(
403407
})
404408
}
405409

410+
fn push_value(
411+
schema: &ParserMapSchema,
412+
log_record: &mut LogRecord,
413+
attributes: &mut Vec<(Box<str>, AnyValue)>,
414+
key: Box<str>,
415+
value: OwnedValue,
416+
) {
417+
if schema
418+
.get_schema_for_key(schema.normalize_key(&key))
419+
.is_some()
420+
{
421+
log_record.set(&key, ResolvedValue::Computed(value));
422+
} else {
423+
attributes.push((key, value.into()));
424+
}
425+
}
426+
406427
fn build_parser_options(options: &mut BridgeOptions) -> Result<ParserOptions, ParserError> {
407428
let mut parser_options = ParserOptions::new().with_attached_data_names(&[
408429
"resource",
@@ -784,6 +805,193 @@ mod tests {
784805
assert!(!dropped_records.is_empty());
785806
}
786807

808+
#[test]
809+
fn test_process_parsed_export_logs_service_request_summary_field_mapping() {
810+
let request = ExportLogsServiceRequest::new().with_resource_logs(
811+
ResourceLogs::new().with_scope_logs(
812+
ScopeLogs::new()
813+
.with_log_record(LogRecord::new().with_severity_text("INFO".into()))
814+
.with_log_record(LogRecord::new().with_severity_text("WARN".into())),
815+
),
816+
);
817+
818+
let pipeline = parse_kql_query_into_pipeline(
819+
"source | summarize Count = count() by severityText",
820+
None,
821+
)
822+
.unwrap();
823+
824+
let mut response = process_export_logs_service_request_using_pipeline(
825+
&pipeline,
826+
RecordSetEngineDiagnosticLevel::Verbose,
827+
request,
828+
)
829+
.unwrap();
830+
831+
assert_eq!(2, response.included_record_count);
832+
assert_eq!(2, response.dropped_record_count);
833+
834+
let response_otlp = response.included_records.as_mut().unwrap();
835+
836+
let response_summaries = &mut response_otlp.resource_logs[1].scope_logs[0].log_records;
837+
838+
response_summaries.sort_by(|l, r| {
839+
let l = l
840+
.severity_text
841+
.as_ref()
842+
.map(|v| v.get_value())
843+
.unwrap_or("");
844+
let r = r
845+
.severity_text
846+
.as_ref()
847+
.map(|v| v.get_value())
848+
.unwrap_or("");
849+
l.cmp(r)
850+
});
851+
852+
assert_eq!(
853+
Some("INFO"),
854+
response_summaries[0]
855+
.severity_text
856+
.as_ref()
857+
.map(|v| v.get_value())
858+
);
859+
assert_eq!(
860+
Some(Value::Integer(&IntegerValueStorage::new(1))),
861+
response_summaries[0]
862+
.attributes
863+
.get("Count")
864+
.map(|v| v.to_value())
865+
);
866+
867+
assert_eq!(
868+
Some("WARN"),
869+
response_summaries[1]
870+
.severity_text
871+
.as_ref()
872+
.map(|v| v.get_value())
873+
);
874+
assert_eq!(
875+
Some(Value::Integer(&IntegerValueStorage::new(1))),
876+
response_summaries[1]
877+
.attributes
878+
.get("Count")
879+
.map(|v| v.to_value())
880+
);
881+
882+
let (included_records, dropped_records) = response.into_otlp_bytes().unwrap();
883+
884+
assert!(!included_records.is_empty());
885+
assert!(!dropped_records.is_empty());
886+
}
887+
888+
#[test]
889+
fn test_process_parsed_export_logs_service_request_summary_field_mapping_post_pipeline() {
890+
let request = ExportLogsServiceRequest::new().with_resource_logs(
891+
ResourceLogs::new().with_scope_logs(
892+
ScopeLogs::new()
893+
.with_log_record(LogRecord::new().with_severity_text("INFO".into()))
894+
.with_log_record(LogRecord::new().with_severity_text("WARN".into())),
895+
),
896+
);
897+
898+
let pipeline =
899+
parse_kql_query_into_pipeline("source | summarize Count = count() by severityText | extend body = 'hello world', attr1 = 'goodbye world'", None).unwrap();
900+
901+
let mut response = process_export_logs_service_request_using_pipeline(
902+
&pipeline,
903+
RecordSetEngineDiagnosticLevel::Verbose,
904+
request,
905+
)
906+
.unwrap();
907+
908+
assert_eq!(2, response.included_record_count);
909+
assert_eq!(2, response.dropped_record_count);
910+
911+
let response_otlp = response.included_records.as_mut().unwrap();
912+
913+
let response_summaries = &mut response_otlp.resource_logs[1].scope_logs[0].log_records;
914+
915+
response_summaries.sort_by(|l, r| {
916+
let l = l
917+
.severity_text
918+
.as_ref()
919+
.map(|v| v.get_value())
920+
.unwrap_or("");
921+
let r = r
922+
.severity_text
923+
.as_ref()
924+
.map(|v| v.get_value())
925+
.unwrap_or("");
926+
l.cmp(r)
927+
});
928+
929+
assert_eq!(
930+
Some("INFO"),
931+
response_summaries[0]
932+
.severity_text
933+
.as_ref()
934+
.map(|v| v.get_value())
935+
);
936+
assert_eq!(
937+
Some(Value::Integer(&IntegerValueStorage::new(1))),
938+
response_summaries[0]
939+
.attributes
940+
.get("Count")
941+
.map(|v| v.to_value())
942+
);
943+
assert_eq!(
944+
Some(Value::String(&StringValueStorage::new(
945+
"hello world".into()
946+
))),
947+
response_summaries[0].body.as_ref().map(|v| v.to_value())
948+
);
949+
assert_eq!(
950+
Some(Value::String(&StringValueStorage::new(
951+
"goodbye world".into()
952+
))),
953+
response_summaries[0]
954+
.attributes
955+
.get("attr1")
956+
.map(|v| v.to_value())
957+
);
958+
959+
assert_eq!(
960+
Some("WARN"),
961+
response_summaries[1]
962+
.severity_text
963+
.as_ref()
964+
.map(|v| v.get_value())
965+
);
966+
assert_eq!(
967+
Some(Value::Integer(&IntegerValueStorage::new(1))),
968+
response_summaries[1]
969+
.attributes
970+
.get("Count")
971+
.map(|v| v.to_value())
972+
);
973+
assert_eq!(
974+
Some(Value::String(&StringValueStorage::new(
975+
"hello world".into()
976+
))),
977+
response_summaries[1].body.as_ref().map(|v| v.to_value())
978+
);
979+
assert_eq!(
980+
Some(Value::String(&StringValueStorage::new(
981+
"goodbye world".into()
982+
))),
983+
response_summaries[1]
984+
.attributes
985+
.get("attr1")
986+
.map(|v| v.to_value())
987+
);
988+
989+
let (included_records, dropped_records) = response.into_otlp_bytes().unwrap();
990+
991+
assert!(!included_records.is_empty());
992+
assert!(!dropped_records.is_empty());
993+
}
994+
787995
#[test]
788996
fn test_parse_kql_query_into_pipeline_with_attributes_schema() {
789997
let run_test_success = |query: &str| {

0 commit comments

Comments
 (0)