Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 124 additions & 1 deletion quickwit/quickwit-config/src/source_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ impl crate::TestableForRegression for SourceConfig {
transform_config: Some(TransformConfig {
vrl_script: ".message = downcase(string!(.message))".to_string(),
timezone: default_timezone(),
drop_on_abort: default_drop_on_abort(),
}),
input_format: SourceInputFormat::Json,
}
Expand Down Expand Up @@ -633,22 +634,41 @@ pub struct TransformConfig {
/// manipulations. Defaults to `UTC` if not timezone is specified.
#[serde(default = "default_timezone")]
timezone: String,

/// When `true` (the default, matching Vector semantics), documents that are explicitly
/// terminated via the VRL `abort` expression are dropped silently and routed to the
/// `transform_abort` counter. When `false`, an aborted document is treated identically
/// to an unexpected VRL runtime error: a warning is logged and the document is counted
/// as a transform error.
#[serde(default = "default_drop_on_abort")]
drop_on_abort: bool,
}

fn default_timezone() -> String {
"UTC".to_string()
}

fn default_drop_on_abort() -> bool {
true
}

impl TransformConfig {
/// Creates a new [`TransformConfig`] instance from the provided VRL script and optional
/// timezone.
/// timezone. `drop_on_abort` defaults to `true`.
pub fn new(vrl_script: String, timezone_opt: Option<String>) -> Self {
Self {
vrl_script,
timezone: timezone_opt.unwrap_or_else(default_timezone),
drop_on_abort: default_drop_on_abort(),
}
}

/// Returns whether documents aborted via the VRL `abort` expression should be silently
/// dropped (no warn log, separate counter) instead of being treated as transform errors.
pub fn drop_on_abort(&self) -> bool {
self.drop_on_abort
}

#[cfg(feature = "vrl")]
pub(crate) fn validate_vrl_script(&self) -> anyhow::Result<()> {
self.compile_vrl_script()?;
Expand Down Expand Up @@ -709,8 +729,15 @@ impl TransformConfig {
Self {
vrl_script: vrl_script.to_string(),
timezone: default_timezone(),
drop_on_abort: default_drop_on_abort(),
}
}

#[cfg(any(test, feature = "testsuite"))]
pub fn with_drop_on_abort(mut self, drop_on_abort: bool) -> Self {
self.drop_on_abort = drop_on_abort;
self
}
}

#[cfg(test)]
Expand Down Expand Up @@ -754,6 +781,7 @@ mod tests {
transform_config: Some(TransformConfig {
vrl_script: ".message = downcase(string!(.message))".to_string(),
timezone: "local".to_string(),
drop_on_abort: default_drop_on_abort(),
}),
input_format: SourceInputFormat::Json,
};
Expand Down Expand Up @@ -849,6 +877,7 @@ mod tests {
transform_config: Some(TransformConfig {
vrl_script: ".message = downcase(string!(.message))".to_string(),
timezone: "local".to_string(),
drop_on_abort: default_drop_on_abort(),
}),
input_format: SourceInputFormat::Json,
};
Expand Down Expand Up @@ -1355,6 +1384,7 @@ mod tests {
transform_config: Some(TransformConfig {
vrl_script: ".message = downcase(string!(.message))".to_string(),
timezone: default_timezone(),
drop_on_abort: default_drop_on_abort(),
}),
input_format: SourceInputFormat::Json,
};
Expand All @@ -1368,6 +1398,7 @@ mod tests {
let transform_config = TransformConfig {
vrl_script: ".message = downcase(string!(.message))".to_string(),
timezone: "local".to_string(),
drop_on_abort: default_drop_on_abort(),
};
let transform_config_yaml = serde_yaml::to_string(&transform_config).unwrap();
assert_eq!(
Expand All @@ -1379,6 +1410,20 @@ mod tests {
let transform_config = TransformConfig {
vrl_script: ".message = downcase(string!(.message))".to_string(),
timezone: default_timezone(),
drop_on_abort: default_drop_on_abort(),
};
let transform_config_yaml = serde_yaml::to_string(&transform_config).unwrap();
assert_eq!(
serde_yaml::from_str::<TransformConfig>(&transform_config_yaml).unwrap(),
transform_config,
);
}
{
// Round-trip with drop_on_abort explicitly set to false.
let transform_config = TransformConfig {
vrl_script: "abort".to_string(),
timezone: default_timezone(),
drop_on_abort: false,
};
let transform_config_yaml = serde_yaml::to_string(&transform_config).unwrap();
assert_eq!(
Expand All @@ -1400,8 +1445,11 @@ mod tests {
let expected_transform_config = TransformConfig {
vrl_script: ".message = downcase(string!(.message))".to_string(),
timezone: default_timezone(),
drop_on_abort: default_drop_on_abort(),
};
assert_eq!(transform_config, expected_transform_config);
// The default for drop_on_abort is true (Vector semantics).
assert!(transform_config.drop_on_abort());
}
{
let transform_config_yaml = r#"
Expand All @@ -1414,9 +1462,79 @@ mod tests {
let expected_transform_config = TransformConfig {
vrl_script: ".message = downcase(string!(.message))".to_string(),
timezone: "Turkey".to_string(),
drop_on_abort: default_drop_on_abort(),
};
assert_eq!(transform_config, expected_transform_config);
}
{
// Explicit drop_on_abort: false.
let transform_config_yaml = r#"
script: abort
drop_on_abort: false
"#;
let transform_config =
serde_yaml::from_str::<TransformConfig>(transform_config_yaml).unwrap();
assert!(!transform_config.drop_on_abort());
assert_eq!(transform_config.timezone, default_timezone());
}
{
// Explicit drop_on_abort: true (matches default).
let transform_config_yaml = r#"
script: abort
drop_on_abort: true
"#;
let transform_config =
serde_yaml::from_str::<TransformConfig>(transform_config_yaml).unwrap();
assert!(transform_config.drop_on_abort());
}
{
// Unknown fields are rejected by deny_unknown_fields.
let transform_config_yaml = r#"
script: .message = .message
unknown_field: 42
"#;
assert!(serde_yaml::from_str::<TransformConfig>(transform_config_yaml).is_err());
}
{
// Wrong type for `drop_on_abort` (string instead of bool) must fail to parse —
// we don't want a typo silently degrading to the default.
let transform_config_yaml = r#"
script: abort
drop_on_abort: "true"
"#;
let err = serde_yaml::from_str::<TransformConfig>(transform_config_yaml).unwrap_err();
let err_str = err.to_string();
assert!(
err_str.contains("drop_on_abort") || err_str.contains("bool"),
"expected a type-mismatch error mentioning drop_on_abort or bool, got: {err_str}"
);
}
{
// JSON deserialization works the same as YAML (parity check).
let transform_config_json = r#"{
"script": "abort",
"drop_on_abort": false
}"#;
let transform_config: TransformConfig =
serde_json::from_str(transform_config_json).unwrap();
assert!(!transform_config.drop_on_abort());
}
{
// JSON: missing field also applies the default.
let transform_config_json = r#"{
"script": ".message = .message"
}"#;
let transform_config: TransformConfig =
serde_json::from_str(transform_config_json).unwrap();
assert!(transform_config.drop_on_abort());
}
}

#[test]
fn test_transform_config_new_defaults_drop_on_abort_to_true() {
let transform_config = TransformConfig::new(".body = .body".to_string(), None);
assert!(transform_config.drop_on_abort());
assert_eq!(transform_config.timezone, default_timezone());
}

#[cfg(feature = "vrl")]
Expand All @@ -1426,6 +1544,7 @@ mod tests {
let transform_config = TransformConfig {
vrl_script: ".message = downcase(string!(.message))".to_string(),
timezone: "Turkey".to_string(),
drop_on_abort: default_drop_on_abort(),
};
transform_config.compile_vrl_script().unwrap();
}
Expand All @@ -1439,13 +1558,15 @@ mod tests {
"#
.to_string(),
timezone: default_timezone(),
drop_on_abort: default_drop_on_abort(),
};
transform_config.compile_vrl_script().unwrap();
}
{
let transform_config = TransformConfig {
vrl_script: ".message = downcase(string!(.message))".to_string(),
timezone: "foo".to_string(),
drop_on_abort: default_drop_on_abort(),
};
let error = transform_config.compile_vrl_script().unwrap_err();
assert!(error.to_string().starts_with("failed to parse timezone"));
Expand All @@ -1454,6 +1575,7 @@ mod tests {
let transform_config = TransformConfig {
vrl_script: "foo".to_string(),
timezone: "Turkey".to_string(),
drop_on_abort: default_drop_on_abort(),
};
let error = transform_config.compile_vrl_script().unwrap_err();
assert!(error.to_string().starts_with("failed to compile"));
Expand Down Expand Up @@ -1506,6 +1628,7 @@ mod tests {
transform_config: Some(TransformConfig {
vrl_script: ".message = downcase(string!(.message))".to_string(),
timezone: "local".to_string(),
drop_on_abort: default_drop_on_abort(),
}),
input_format: SourceInputFormat::Json,
};
Expand Down
Loading