Skip to content

Commit 82a5521

Browse files
authored
[query-engine] Adjust logging for conversion operations in RecordSet engine (open-telemetry#2435)
Relates to open-telemetry#2430 # Changes * Pass along `SelectionOptions` to `execute_convert_scalar_expression` * Tiddy: * Reduce code duplication in conversion code * Implement `Copy` for `RecordSetEngineDiagnosticLevel` # Details We pass along `SelectionOptions` so that `toint(Something)` will emit a warn if "Something" isn't found but `coalese(tonint(Something), null)` will emit an info. This follows the pattern we've established so far when some outer function is used to handle the "not found" case. The assumption being "not found" is expected\anticipated in these cases so a warning is not necessary\overkill.
1 parent a44fe94 commit 82a5521

6 files changed

Lines changed: 95 additions & 94 deletions

File tree

rust/experimental/query_engine/engine-recordset-otlp-bridge/src/logs.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ impl RecordSet<LogRecord> for ExportLogsServiceRequest {
3535

3636
impl Record for LogRecord {
3737
fn get_diagnostic_level(&self) -> Option<RecordSetEngineDiagnosticLevel> {
38-
self.diagnostic_level.clone()
38+
self.diagnostic_level
3939
}
4040
}
4141

rust/experimental/query_engine/engine-recordset/src/engine.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ impl<'a, 'b, TRecord: Record + 'static> RecordSetEngineBatch<'a, 'b, TRecord> {
135135
}
136136

137137
let execution_context = ExecutionContext::<TRecord>::new(
138-
self.engine.diagnostic_level.clone(),
138+
self.engine.diagnostic_level,
139139
&self.engine.external_function_implementations,
140140
self.pipeline,
141141
&self.global_variables,
@@ -201,7 +201,7 @@ impl<'a, 'b, TRecord: Record + 'static> RecordSetEngineBatch<'a, 'b, TRecord> {
201201
self.pipeline,
202202
self.diagnostics,
203203
process_summaries(
204-
self.engine.diagnostic_level.clone(),
204+
self.engine.diagnostic_level,
205205
&self.engine.external_function_implementations,
206206
&self.global_variables,
207207
self.pipeline,
@@ -219,7 +219,7 @@ impl<'a, 'b, TRecord: Record + 'static> RecordSetEngineBatch<'a, 'b, TRecord> {
219219
) -> RecordSetEngineResult<'a, TRecord> {
220220
let diagnostic_level = record
221221
.get_diagnostic_level()
222-
.unwrap_or(self.engine.diagnostic_level.clone());
222+
.unwrap_or(self.engine.diagnostic_level);
223223

224224
let execution_context = ExecutionContext::new(
225225
diagnostic_level,
@@ -351,7 +351,7 @@ fn process_summaries<'a>(
351351
let summaries = Summaries::new(1);
352352

353353
let execution_context = ExecutionContext::new(
354-
diagnostic_level.clone(),
354+
diagnostic_level,
355355
external_function_implementations,
356356
pipeline,
357357
global_variables,
@@ -385,7 +385,7 @@ fn process_summaries<'a>(
385385
}
386386

387387
let results = process_summaries(
388-
diagnostic_level.clone(),
388+
diagnostic_level,
389389
external_function_implementations,
390390
global_variables,
391391
pipeline,

rust/experimental/query_engine/engine-recordset/src/engine_diagnostic.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use std::str::FromStr;
55

66
use data_engine_expressions::Expression;
77

8-
#[derive(Default, Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
8+
#[derive(Default, Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
99
pub enum RecordSetEngineDiagnosticLevel {
1010
#[default]
1111
Verbose = 0,
@@ -80,7 +80,7 @@ impl<'a> RecordSetEngineDiagnostic<'a> {
8080
}
8181

8282
pub fn get_diagnostic_level(&self) -> RecordSetEngineDiagnosticLevel {
83-
self.diagnostic_level.clone()
83+
self.diagnostic_level
8484
}
8585

8686
pub fn get_expression(&self) -> &dyn Expression {

rust/experimental/query_engine/engine-recordset/src/execution_context.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ impl<'a, 'b, TRecord: Record + 'static> ExecutionContext<'a, 'b, TRecord> {
6666
arguments: Option<&'b dyn ExecutionContextArguments>,
6767
) -> ExecutionContext<'a, 'b, MapValueStorage<OwnedValue>> {
6868
ExecutionContext::<MapValueStorage<OwnedValue>>::new(
69-
self.diagnostic_level.clone(),
69+
self.diagnostic_level,
7070
self.external_function_implementations,
7171
self.pipeline,
7272
self.get_variables().global_variables,

rust/experimental/query_engine/engine-recordset/src/scalars/convert_scalar_expressions.rs

Lines changed: 81 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -3,145 +3,115 @@
33

44
use data_engine_expressions::*;
55

6-
use crate::{execution_context::*, scalars::execute_scalar_expression, *};
6+
use crate::{execution_context::*, scalars::*, *};
77

88
pub fn execute_convert_scalar_expression<'a, 'b, 'c, TRecord: Record>(
99
execution_context: &'b ExecutionContext<'a, '_, TRecord>,
1010
convert_scalar_expression: &'a ConvertScalarExpression,
11+
selection_options: SelectionOptions,
1112
) -> Result<ResolvedValue<'c>, ExpressionError>
1213
where
1314
'a: 'c,
1415
'b: 'c,
1516
{
16-
let value = match convert_scalar_expression {
17-
ConvertScalarExpression::Boolean(c) => {
18-
let inner_value =
19-
execute_scalar_expression(execution_context, c.get_inner_expression())?;
17+
let (inner_expression, target_type) = unpack(convert_scalar_expression);
2018

21-
let value = inner_value.to_value();
19+
let resolved_inner_value = execute_scalar_expression_with_options(
20+
execution_context,
21+
inner_expression,
22+
selection_options,
23+
)?;
2224

23-
if let Some(b) = value.convert_to_bool() {
25+
let inner_value = resolved_inner_value.to_value();
26+
27+
let value = match target_type {
28+
ValueType::Boolean => {
29+
if let Some(b) = inner_value.convert_to_bool() {
2430
ResolvedValue::Computed(OwnedValue::Boolean(BooleanValueStorage::new(b)))
2531
} else {
26-
execution_context.add_diagnostic_if_enabled(
27-
RecordSetEngineDiagnosticLevel::Warn,
32+
emit_conversion_failure_diagnostic(
33+
execution_context,
2834
convert_scalar_expression,
29-
|| {
30-
format!(
31-
"Input of '{:?}' type could not be converted into a bool",
32-
value.get_value_type()
33-
)
34-
},
35+
&inner_value,
36+
"bool",
3537
);
3638

3739
ResolvedValue::Computed(OwnedValue::Null)
3840
}
3941
}
40-
ConvertScalarExpression::DateTime(c) => {
41-
let inner_value =
42-
execute_scalar_expression(execution_context, c.get_inner_expression())?;
43-
44-
let value = inner_value.to_value();
45-
46-
if let Some(d) = value.convert_to_datetime() {
42+
ValueType::DateTime => {
43+
if let Some(d) = inner_value.convert_to_datetime() {
4744
ResolvedValue::Computed(OwnedValue::DateTime(DateTimeValueStorage::new(d)))
4845
} else {
49-
execution_context.add_diagnostic_if_enabled(
50-
RecordSetEngineDiagnosticLevel::Warn,
46+
emit_conversion_failure_diagnostic(
47+
execution_context,
5148
convert_scalar_expression,
52-
|| {
53-
format!(
54-
"Input of '{:?}' type could not be converted into a DateTime",
55-
value.get_value_type()
56-
)
57-
},
49+
&inner_value,
50+
"DateTime",
5851
);
52+
5953
ResolvedValue::Computed(OwnedValue::Null)
6054
}
6155
}
62-
ConvertScalarExpression::Double(c) => {
63-
let inner_value =
64-
execute_scalar_expression(execution_context, c.get_inner_expression())?;
65-
66-
let value = inner_value.to_value();
67-
68-
if let Some(d) = value.convert_to_double() {
56+
ValueType::Double => {
57+
if let Some(d) = inner_value.convert_to_double() {
6958
ResolvedValue::Computed(OwnedValue::Double(DoubleValueStorage::new(d)))
7059
} else {
71-
execution_context.add_diagnostic_if_enabled(
72-
RecordSetEngineDiagnosticLevel::Warn,
60+
emit_conversion_failure_diagnostic(
61+
execution_context,
7362
convert_scalar_expression,
74-
|| {
75-
format!(
76-
"Input of '{:?}' type could not be converted into a double",
77-
value.get_value_type()
78-
)
79-
},
63+
&inner_value,
64+
"double",
8065
);
66+
8167
ResolvedValue::Computed(OwnedValue::Null)
8268
}
8369
}
84-
ConvertScalarExpression::Integer(c) => {
85-
let inner_value =
86-
execute_scalar_expression(execution_context, c.get_inner_expression())?;
87-
88-
let value = inner_value.to_value();
89-
90-
if let Some(i) = value.convert_to_integer() {
70+
ValueType::Integer => {
71+
if let Some(i) = inner_value.convert_to_integer() {
9172
ResolvedValue::Computed(OwnedValue::Integer(IntegerValueStorage::new(i)))
9273
} else {
93-
execution_context.add_diagnostic_if_enabled(
94-
RecordSetEngineDiagnosticLevel::Warn,
74+
emit_conversion_failure_diagnostic(
75+
execution_context,
9576
convert_scalar_expression,
96-
|| {
97-
format!(
98-
"Input of '{:?}' type could not be converted into an integer",
99-
value.get_value_type()
100-
)
101-
},
77+
&inner_value,
78+
"integer",
10279
);
80+
10381
ResolvedValue::Computed(OwnedValue::Null)
10482
}
10583
}
106-
ConvertScalarExpression::String(c) => {
107-
let v = execute_scalar_expression(execution_context, c.get_inner_expression())?;
108-
let value_type = v.get_value_type();
84+
ValueType::String => {
85+
let value_type = inner_value.get_value_type();
10986
if value_type == ValueType::String {
110-
v
87+
resolved_inner_value
11188
} else if value_type == ValueType::Null {
11289
ResolvedValue::Computed(OwnedValue::String(StringValueStorage::new("".into())))
11390
} else {
11491
let mut string_value = None;
115-
v.to_value().convert_to_string(&mut |s| {
92+
inner_value.convert_to_string(&mut |s| {
11693
string_value = Some(StringValueStorage::new(s.into()))
11794
});
11895
ResolvedValue::Computed(OwnedValue::String(
11996
string_value.expect("Inner value did not return a string"),
12097
))
12198
}
12299
}
123-
ConvertScalarExpression::TimeSpan(c) => {
124-
let inner_value =
125-
execute_scalar_expression(execution_context, c.get_inner_expression())?;
126-
127-
let value = inner_value.to_value();
128-
129-
if let Some(t) = value.convert_to_timespan() {
100+
ValueType::TimeSpan => {
101+
if let Some(t) = inner_value.convert_to_timespan() {
130102
ResolvedValue::Computed(OwnedValue::TimeSpan(TimeSpanValueStorage::new(t)))
131103
} else {
132-
execution_context.add_diagnostic_if_enabled(
133-
RecordSetEngineDiagnosticLevel::Warn,
104+
emit_conversion_failure_diagnostic(
105+
execution_context,
134106
convert_scalar_expression,
135-
|| {
136-
format!(
137-
"Input of '{:?}' type could not be converted into a TimeSpan",
138-
value.get_value_type()
139-
)
140-
},
107+
&inner_value,
108+
"TimeSpan",
141109
);
110+
142111
ResolvedValue::Computed(OwnedValue::Null)
143112
}
144113
}
114+
_ => unreachable!("Unexpected ValueType conversion"),
145115
};
146116

147117
execution_context.add_diagnostic_if_enabled(
@@ -153,6 +123,37 @@ where
153123
Ok(value)
154124
}
155125

126+
fn unpack(convert_scalar_expression: &ConvertScalarExpression) -> (&ScalarExpression, ValueType) {
127+
match convert_scalar_expression {
128+
ConvertScalarExpression::Boolean(c) => (c.get_inner_expression(), ValueType::Boolean),
129+
ConvertScalarExpression::DateTime(c) => (c.get_inner_expression(), ValueType::DateTime),
130+
ConvertScalarExpression::Double(c) => (c.get_inner_expression(), ValueType::Double),
131+
ConvertScalarExpression::Integer(c) => (c.get_inner_expression(), ValueType::Integer),
132+
ConvertScalarExpression::String(c) => (c.get_inner_expression(), ValueType::String),
133+
ConvertScalarExpression::TimeSpan(c) => (c.get_inner_expression(), ValueType::TimeSpan),
134+
}
135+
}
136+
137+
fn emit_conversion_failure_diagnostic<'a, TRecord: Record>(
138+
execution_context: &ExecutionContext<'a, '_, TRecord>,
139+
convert_scalar_expression: &'a ConvertScalarExpression,
140+
value: &Value<'_>,
141+
type_name: &str,
142+
) {
143+
if value.get_value_type() != ValueType::Null {
144+
execution_context.add_diagnostic_if_enabled(
145+
RecordSetEngineDiagnosticLevel::Warn,
146+
convert_scalar_expression,
147+
|| {
148+
format!(
149+
"Input of '{}' type could not be converted into a {type_name}",
150+
value.get_value_type()
151+
)
152+
},
153+
);
154+
}
155+
}
156+
156157
#[cfg(test)]
157158
mod tests {
158159
use chrono::{TimeZone, Utc};

rust/experimental/query_engine/engine-recordset/src/scalars/scalar_expressions.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ where
220220
}
221221
}
222222
ScalarExpression::Convert(c) => {
223-
return execute_convert_scalar_expression(execution_context, c);
223+
return execute_convert_scalar_expression(execution_context, c, selection_options);
224224
}
225225
ScalarExpression::Length(l) => {
226226
let inner_value =
@@ -652,7 +652,7 @@ where
652652
}
653653
Ok(None) => {
654654
execution_context.add_diagnostic_if_enabled(
655-
selection_options.selector_not_found_diagnostic_level.clone(),
655+
selection_options.selector_not_found_diagnostic_level,
656656
expression,
657657
|| format!("Could not find map key '{}' specified in accessor expression", map_key.get_value()),
658658
);
@@ -701,7 +701,7 @@ where
701701
}
702702
Ok(None) => {
703703
execution_context.add_diagnostic_if_enabled(
704-
selection_options.selector_not_found_diagnostic_level.clone(),
704+
selection_options.selector_not_found_diagnostic_level,
705705
expression,
706706
|| format!("Could not find array index '{index}' specified in accessor expression"),
707707
);
@@ -778,7 +778,7 @@ fn select_from_value<'a, 'b, TRecord: Record>(
778778
}
779779
None => {
780780
execution_context.add_diagnostic_if_enabled(
781-
selection_options.selector_not_found_diagnostic_level.clone(),
781+
selection_options.selector_not_found_diagnostic_level,
782782
expression,
783783
|| format!("Could not find map key '{}' specified in accessor expression", map_key.get_value()),
784784
);
@@ -819,7 +819,7 @@ fn select_from_value<'a, 'b, TRecord: Record>(
819819
}
820820
None => {
821821
execution_context.add_diagnostic_if_enabled(
822-
selection_options.selector_not_found_diagnostic_level.clone(),
822+
selection_options.selector_not_found_diagnostic_level,
823823
expression,
824824
|| format!("Could not find array index '{index}' specified in accessor expression"),
825825
);

0 commit comments

Comments
 (0)