diff --git a/quickwit/quickwit-config/src/source_config/mod.rs b/quickwit/quickwit-config/src/source_config/mod.rs index 81811e9f47c..65bb16772cf 100644 --- a/quickwit/quickwit-config/src/source_config/mod.rs +++ b/quickwit/quickwit-config/src/source_config/mod.rs @@ -172,6 +172,7 @@ impl crate::TestableForRegression for SourceConfig { transform_config: Some(TransformConfig { vrl_script: ".message = downcase(string!(.message))".to_string(), timezone: default_timezone(), + drop_on_abort: default_drop_on_abort(), }), input_format: SourceInputFormat::Json, } @@ -633,22 +634,41 @@ pub struct TransformConfig { /// manipulations. Defaults to `UTC` if not timezone is specified. #[serde(default = "default_timezone")] timezone: String, + + /// When `true` (the default, matching Vector semantics), documents that are explicitly + /// terminated via the VRL `abort` expression are dropped silently and routed to the + /// `transform_abort` counter. When `false`, an aborted document is treated identically + /// to an unexpected VRL runtime error: a warning is logged and the document is counted + /// as a transform error. + #[serde(default = "default_drop_on_abort")] + drop_on_abort: bool, } fn default_timezone() -> String { "UTC".to_string() } +fn default_drop_on_abort() -> bool { + true +} + impl TransformConfig { /// Creates a new [`TransformConfig`] instance from the provided VRL script and optional - /// timezone. + /// timezone. `drop_on_abort` defaults to `true`. pub fn new(vrl_script: String, timezone_opt: Option) -> Self { Self { vrl_script, timezone: timezone_opt.unwrap_or_else(default_timezone), + drop_on_abort: default_drop_on_abort(), } } + /// Returns whether documents aborted via the VRL `abort` expression should be silently + /// dropped (no warn log, separate counter) instead of being treated as transform errors. + pub fn drop_on_abort(&self) -> bool { + self.drop_on_abort + } + #[cfg(feature = "vrl")] pub(crate) fn validate_vrl_script(&self) -> anyhow::Result<()> { self.compile_vrl_script()?; @@ -709,8 +729,15 @@ impl TransformConfig { Self { vrl_script: vrl_script.to_string(), timezone: default_timezone(), + drop_on_abort: default_drop_on_abort(), } } + + #[cfg(any(test, feature = "testsuite"))] + pub fn with_drop_on_abort(mut self, drop_on_abort: bool) -> Self { + self.drop_on_abort = drop_on_abort; + self + } } #[cfg(test)] @@ -754,6 +781,7 @@ mod tests { transform_config: Some(TransformConfig { vrl_script: ".message = downcase(string!(.message))".to_string(), timezone: "local".to_string(), + drop_on_abort: default_drop_on_abort(), }), input_format: SourceInputFormat::Json, }; @@ -849,6 +877,7 @@ mod tests { transform_config: Some(TransformConfig { vrl_script: ".message = downcase(string!(.message))".to_string(), timezone: "local".to_string(), + drop_on_abort: default_drop_on_abort(), }), input_format: SourceInputFormat::Json, }; @@ -1355,6 +1384,7 @@ mod tests { transform_config: Some(TransformConfig { vrl_script: ".message = downcase(string!(.message))".to_string(), timezone: default_timezone(), + drop_on_abort: default_drop_on_abort(), }), input_format: SourceInputFormat::Json, }; @@ -1368,6 +1398,7 @@ mod tests { let transform_config = TransformConfig { vrl_script: ".message = downcase(string!(.message))".to_string(), timezone: "local".to_string(), + drop_on_abort: default_drop_on_abort(), }; let transform_config_yaml = serde_yaml::to_string(&transform_config).unwrap(); assert_eq!( @@ -1379,6 +1410,20 @@ mod tests { let transform_config = TransformConfig { vrl_script: ".message = downcase(string!(.message))".to_string(), timezone: default_timezone(), + drop_on_abort: default_drop_on_abort(), + }; + let transform_config_yaml = serde_yaml::to_string(&transform_config).unwrap(); + assert_eq!( + serde_yaml::from_str::(&transform_config_yaml).unwrap(), + transform_config, + ); + } + { + // Round-trip with drop_on_abort explicitly set to false. + let transform_config = TransformConfig { + vrl_script: "abort".to_string(), + timezone: default_timezone(), + drop_on_abort: false, }; let transform_config_yaml = serde_yaml::to_string(&transform_config).unwrap(); assert_eq!( @@ -1400,8 +1445,11 @@ mod tests { let expected_transform_config = TransformConfig { vrl_script: ".message = downcase(string!(.message))".to_string(), timezone: default_timezone(), + drop_on_abort: default_drop_on_abort(), }; assert_eq!(transform_config, expected_transform_config); + // The default for drop_on_abort is true (Vector semantics). + assert!(transform_config.drop_on_abort()); } { let transform_config_yaml = r#" @@ -1414,9 +1462,79 @@ mod tests { let expected_transform_config = TransformConfig { vrl_script: ".message = downcase(string!(.message))".to_string(), timezone: "Turkey".to_string(), + drop_on_abort: default_drop_on_abort(), }; assert_eq!(transform_config, expected_transform_config); } + { + // Explicit drop_on_abort: false. + let transform_config_yaml = r#" + script: abort + drop_on_abort: false + "#; + let transform_config = + serde_yaml::from_str::(transform_config_yaml).unwrap(); + assert!(!transform_config.drop_on_abort()); + assert_eq!(transform_config.timezone, default_timezone()); + } + { + // Explicit drop_on_abort: true (matches default). + let transform_config_yaml = r#" + script: abort + drop_on_abort: true + "#; + let transform_config = + serde_yaml::from_str::(transform_config_yaml).unwrap(); + assert!(transform_config.drop_on_abort()); + } + { + // Unknown fields are rejected by deny_unknown_fields. + let transform_config_yaml = r#" + script: .message = .message + unknown_field: 42 + "#; + assert!(serde_yaml::from_str::(transform_config_yaml).is_err()); + } + { + // Wrong type for `drop_on_abort` (string instead of bool) must fail to parse — + // we don't want a typo silently degrading to the default. + let transform_config_yaml = r#" + script: abort + drop_on_abort: "true" + "#; + let err = serde_yaml::from_str::(transform_config_yaml).unwrap_err(); + let err_str = err.to_string(); + assert!( + err_str.contains("drop_on_abort") || err_str.contains("bool"), + "expected a type-mismatch error mentioning drop_on_abort or bool, got: {err_str}" + ); + } + { + // JSON deserialization works the same as YAML (parity check). + let transform_config_json = r#"{ + "script": "abort", + "drop_on_abort": false + }"#; + let transform_config: TransformConfig = + serde_json::from_str(transform_config_json).unwrap(); + assert!(!transform_config.drop_on_abort()); + } + { + // JSON: missing field also applies the default. + let transform_config_json = r#"{ + "script": ".message = .message" + }"#; + let transform_config: TransformConfig = + serde_json::from_str(transform_config_json).unwrap(); + assert!(transform_config.drop_on_abort()); + } + } + + #[test] + fn test_transform_config_new_defaults_drop_on_abort_to_true() { + let transform_config = TransformConfig::new(".body = .body".to_string(), None); + assert!(transform_config.drop_on_abort()); + assert_eq!(transform_config.timezone, default_timezone()); } #[cfg(feature = "vrl")] @@ -1426,6 +1544,7 @@ mod tests { let transform_config = TransformConfig { vrl_script: ".message = downcase(string!(.message))".to_string(), timezone: "Turkey".to_string(), + drop_on_abort: default_drop_on_abort(), }; transform_config.compile_vrl_script().unwrap(); } @@ -1439,6 +1558,7 @@ mod tests { "# .to_string(), timezone: default_timezone(), + drop_on_abort: default_drop_on_abort(), }; transform_config.compile_vrl_script().unwrap(); } @@ -1446,6 +1566,7 @@ mod tests { let transform_config = TransformConfig { vrl_script: ".message = downcase(string!(.message))".to_string(), timezone: "foo".to_string(), + drop_on_abort: default_drop_on_abort(), }; let error = transform_config.compile_vrl_script().unwrap_err(); assert!(error.to_string().starts_with("failed to parse timezone")); @@ -1454,6 +1575,7 @@ mod tests { let transform_config = TransformConfig { vrl_script: "foo".to_string(), timezone: "Turkey".to_string(), + drop_on_abort: default_drop_on_abort(), }; let error = transform_config.compile_vrl_script().unwrap_err(); assert!(error.to_string().starts_with("failed to compile")); @@ -1506,6 +1628,7 @@ mod tests { transform_config: Some(TransformConfig { vrl_script: ".message = downcase(string!(.message))".to_string(), timezone: "local".to_string(), + drop_on_abort: default_drop_on_abort(), }), input_format: SourceInputFormat::Json, }; diff --git a/quickwit/quickwit-indexing/src/actors/doc_processor.rs b/quickwit/quickwit-indexing/src/actors/doc_processor.rs index 559d392afba..72a1abe670c 100644 --- a/quickwit/quickwit-indexing/src/actors/doc_processor.rs +++ b/quickwit/quickwit-indexing/src/actors/doc_processor.rs @@ -92,6 +92,25 @@ pub enum DocProcessorError { #[cfg(feature = "vrl")] #[error("VRL transform error: {0}")] Transform(VrlTerminate), + /// A VRL `abort` expression terminated the script while `drop_on_abort` was enabled + /// on the transform. This is an intentional drop, not an error — the document is + /// silently filtered out and routed to a separate counter. + #[cfg(feature = "vrl")] + #[error("VRL transform aborted (drop_on_abort): {0}")] + TransformAbort(VrlTerminate), +} + +impl DocProcessorError { + /// Whether this failure should produce a warn log. Returns `false` for intentional + /// drops (`TransformAbort`, triggered when VRL `abort` fires while `drop_on_abort` + /// is enabled) so they remain silent end-to-end. + fn should_warn(&self) -> bool { + match self { + #[cfg(feature = "vrl")] + DocProcessorError::TransformAbort(_) => false, + _ => true, + } + } } impl From for DocProcessorError { @@ -314,17 +333,18 @@ pub struct DocProcessorCounters { index_id: IndexId, source_id: SourceId, - /// Overall number of documents received, partitioned - /// into 5 categories: + /// Overall number of documents received, partitioned into categories: /// - valid documents - /// - number of docs that could not be parsed. + /// - number of docs that the doc mapper rejected. /// - number of docs that were not valid json. - /// - number of docs that could not be transformed. - /// - number of docs for which the doc mapper returned an error. - /// - number of valid docs. + /// - number of docs that failed to parse as OTLP logs/traces. + /// - number of docs that the VRL transform errored on. + /// - number of docs that were intentionally dropped via VRL `abort` (with `drop_on_abort` + /// enabled). pub valid: DocProcessorCounter, pub doc_mapper_errors: DocProcessorCounter, pub transform_errors: DocProcessorCounter, + pub transform_aborts: DocProcessorCounter, pub json_parse_errors: DocProcessorCounter, pub otlp_parse_errors: DocProcessorCounter, @@ -343,6 +363,8 @@ impl DocProcessorCounters { DocProcessorCounter::for_index_and_doc_processor_outcome(&index_id, "doc_mapper_error"); let transform_errors = DocProcessorCounter::for_index_and_doc_processor_outcome(&index_id, "transform_error"); + let transform_aborts = + DocProcessorCounter::for_index_and_doc_processor_outcome(&index_id, "transform_abort"); let json_parse_errors = DocProcessorCounter::for_index_and_doc_processor_outcome(&index_id, "json_parse_error"); let otlp_parse_errors = @@ -354,13 +376,21 @@ impl DocProcessorCounters { valid: valid_docs, doc_mapper_errors, transform_errors, + transform_aborts, json_parse_errors, otlp_parse_errors, num_bytes_total: Default::default(), } } - /// Returns the overall number of docs that went through the indexer (valid or not). + /// Returns the overall number of docs that contributed to indexing throughput + /// (valid or invalid). + /// + /// Docs intentionally dropped via VRL `abort` (with `drop_on_abort` enabled) are NOT + /// included here — they are tracked separately via [`Self::num_dropped_docs`]. This + /// preserves the downstream invariant that + /// `num_processed_docs() - num_invalid_docs() == number_of_docs_actually_indexed`, + /// which the CLI/API uses for throughput reporting. pub fn num_processed_docs(&self) -> u64 { self.valid.get_num_docs() + self.doc_mapper_errors.get_num_docs() @@ -369,9 +399,11 @@ impl DocProcessorCounters { + self.transform_errors.get_num_docs() } - /// Returns the overall number of docs that were sent to the indexer but were invalid. - /// (For instance, because they were missing a required field or because their because - /// their format was invalid) + /// Returns the overall number of docs that were sent to the indexer but were invalid + /// (for instance, missing a required field or malformed payload). + /// + /// Documents dropped by an intentional VRL `abort` are NOT counted here — they are + /// surfaced via [`Self::num_dropped_docs`] instead. pub fn num_invalid_docs(&self) -> u64 { self.doc_mapper_errors.get_num_docs() + self.json_parse_errors.get_num_docs() @@ -379,6 +411,12 @@ impl DocProcessorCounters { + self.transform_errors.get_num_docs() } + /// Returns the number of docs intentionally dropped by a VRL `abort` expression + /// while `drop_on_abort` was enabled. + pub fn num_dropped_docs(&self) -> u64 { + self.transform_aborts.get_num_docs() + } + pub fn record_valid(&self, num_bytes: u64) { self.num_bytes_total.fetch_add(num_bytes, Ordering::Relaxed); self.valid.record_doc(num_bytes); @@ -400,6 +438,10 @@ impl DocProcessorCounters { DocProcessorError::Transform(_) => { self.transform_errors.record_doc(num_bytes); } + #[cfg(feature = "vrl")] + DocProcessorError::TransformAbort(_) => { + self.transform_aborts.record_doc(num_bytes); + } }; } } @@ -480,12 +522,14 @@ impl DocProcessor { processed_docs.push(processed_doc); } Err(error) => { - rate_limited_warn!( - limit_per_min = 10, - index_id = self.counters.index_id, - source_id = self.counters.source_id, - "{error}", - ); + if error.should_warn() { + rate_limited_warn!( + limit_per_min = 10, + index_id = self.counters.index_id, + source_id = self.counters.source_id, + "{error}", + ); + } self.counters.record_error(error, num_bytes as u64); } } @@ -1341,4 +1385,399 @@ mod tests_vrl { ); universe.assert_quit().await; } + + /// Build a `DocProcessor` configured with the given VRL `script` and `drop_on_abort` + /// setting. Returns the running mailbox/handle plus the indexer inbox so tests can + /// inspect both the counters and the downstream batches. + async fn build_vrl_doc_processor( + universe: &Universe, + script: &str, + drop_on_abort: bool, + ) -> ( + quickwit_actors::Mailbox, + quickwit_actors::ActorHandle, + quickwit_actors::Inbox, + ) { + let (indexer_mailbox, indexer_inbox) = universe.create_test_mailbox::(); + let doc_mapper = Arc::new(default_doc_mapper_for_test()); + let transform_config = TransformConfig::for_test(script).with_drop_on_abort(drop_on_abort); + let doc_processor = DocProcessor::try_new( + "my-index".to_string(), + "my-source".to_string(), + doc_mapper, + indexer_mailbox, + Some(transform_config), + SourceInputFormat::Json, + ) + .unwrap(); + let (mailbox, handle) = universe.spawn_builder().spawn(doc_processor); + (mailbox, handle, indexer_inbox) + } + + /// A doc unconditionally `abort`ed with `drop_on_abort=true` must: + /// - increment `transform_aborts`, NOT `transform_errors` + /// - NOT show up in `num_invalid_docs()` (intentional drop, not invalid) + /// - NOT show up in `num_processed_docs()` either, so the downstream invariant `indexed = + /// num_processed_docs - num_invalid_docs` doesn't inflate (Codex #6472) + /// - still increment `num_bytes_total` (bytes did flow through the processor) + /// - not be forwarded to the indexer + #[tokio::test] + async fn test_doc_processor_vrl_abort_with_drop_on_abort_true() -> anyhow::Result<()> { + let universe = Universe::with_accelerated_time(); + let (mailbox, handle, indexer_inbox) = + build_vrl_doc_processor(&universe, "abort", true).await; + let payload = + br#"{"body": "drop me", "timestamp": 1628837062, "response_date": "2021-12-19T16:39:59+00:00", "response_time": 2, "response_payload": "YWJj"}"#; + let num_bytes_doc = payload.len() as u64; + mailbox + .send_message(RawDocBatch::for_test(&[payload], 0..1)) + .await?; + let counters = handle.process_pending_and_observe().await.state; + + assert_eq!(counters.transform_aborts.get_num_docs(), 1); + assert_eq!(counters.transform_errors.get_num_docs(), 0); + assert_eq!(counters.valid.get_num_docs(), 0); + assert_eq!(counters.doc_mapper_errors.get_num_docs(), 0); + assert_eq!(counters.json_parse_errors.get_num_docs(), 0); + + assert_eq!(counters.num_invalid_docs(), 0); + assert_eq!(counters.num_dropped_docs(), 1); + assert_eq!( + counters.num_processed_docs(), + 0, + "aborts must NOT count toward num_processed_docs — they would inflate the CLI's \ + indexed-doc total" + ); + assert_eq!( + counters.num_bytes_total.load(Ordering::Relaxed), + num_bytes_doc + ); + + // Nothing is forwarded to the indexer. + let downstream = indexer_inbox.drain_for_test(); + let total_downstream_docs: usize = downstream + .into_iter() + .filter_map(|msg| msg.downcast::().ok()) + .map(|batch| batch.docs.len()) + .sum(); + assert_eq!(total_downstream_docs, 0); + universe.assert_quit().await; + Ok(()) + } + + /// Same script but with `drop_on_abort=false`: the abort is now treated as a transform + /// error — counted in `transform_errors`, included in `num_invalid_docs()`. + #[tokio::test] + async fn test_doc_processor_vrl_abort_with_drop_on_abort_false() -> anyhow::Result<()> { + let universe = Universe::with_accelerated_time(); + let (mailbox, handle, _indexer_inbox) = + build_vrl_doc_processor(&universe, "abort", false).await; + let payload = + br#"{"body": "fail me", "timestamp": 1628837062, "response_date": "2021-12-19T16:39:59+00:00", "response_time": 2, "response_payload": "YWJj"}"#; + mailbox + .send_message(RawDocBatch::for_test(&[payload], 0..1)) + .await?; + let counters = handle.process_pending_and_observe().await.state; + + assert_eq!(counters.transform_errors.get_num_docs(), 1); + assert_eq!(counters.transform_aborts.get_num_docs(), 0); + assert_eq!(counters.valid.get_num_docs(), 0); + assert_eq!(counters.num_invalid_docs(), 1); + assert_eq!(counters.num_dropped_docs(), 0); + universe.assert_quit().await; + Ok(()) + } + + /// A VRL runtime error (not an explicit `abort`) must always count as a transform + /// error, regardless of `drop_on_abort`. + #[tokio::test] + async fn test_doc_processor_vrl_runtime_error_always_counts_as_transform_error() + -> anyhow::Result<()> { + // `string!(.body)` raises a runtime error when `.body` is not a string. + let script = ".body = upcase(string!(.body))"; + for drop_on_abort in [true, false] { + let universe = Universe::with_accelerated_time(); + let (mailbox, handle, _indexer_inbox) = + build_vrl_doc_processor(&universe, script, drop_on_abort).await; + let payload = br#"{"body": 42, "timestamp": 1628837062, "response_date": "2021-12-19T16:39:59+00:00", "response_time": 2, "response_payload": "YWJj"}"#; + mailbox + .send_message(RawDocBatch::for_test(&[payload], 0..1)) + .await?; + let counters = handle.process_pending_and_observe().await.state; + assert_eq!( + counters.transform_errors.get_num_docs(), + 1, + "runtime error must count as transform error (drop_on_abort={drop_on_abort})" + ); + assert_eq!( + counters.transform_aborts.get_num_docs(), + 0, + "runtime error must NOT count as transform abort (drop_on_abort={drop_on_abort})" + ); + universe.assert_quit().await; + } + Ok(()) + } + + /// Mixed batch with `drop_on_abort=true`: a conditional `abort` filters some docs, + /// the rest pass through. Counters split cleanly. + #[tokio::test] + async fn test_doc_processor_vrl_conditional_abort_filters_subset() -> anyhow::Result<()> { + let script = r#" + if .body == "drop" { + abort + } + "#; + let universe = Universe::with_accelerated_time(); + let (mailbox, handle, indexer_inbox) = + build_vrl_doc_processor(&universe, script, true).await; + mailbox + .send_message(RawDocBatch::for_test( + &[ + br#"{"body": "keep", "timestamp": 1628837062, "response_date": "2021-12-19T16:39:59+00:00", "response_time": 2, "response_payload": "YWJj"}"#, + br#"{"body": "drop", "timestamp": 1628837062, "response_date": "2021-12-19T16:39:59+00:00", "response_time": 2, "response_payload": "YWJj"}"#, + br#"{"body": "keep2", "timestamp": 1628837062, "response_date": "2021-12-19T16:39:59+00:00", "response_time": 2, "response_payload": "YWJj"}"#, + ], + 0..3, + )) + .await?; + let counters = handle.process_pending_and_observe().await.state; + assert_eq!(counters.valid.get_num_docs(), 2); + assert_eq!(counters.transform_aborts.get_num_docs(), 1); + assert_eq!(counters.transform_errors.get_num_docs(), 0); + assert_eq!(counters.num_invalid_docs(), 0); + assert_eq!(counters.num_dropped_docs(), 1); + // 2 valid; the aborted doc is NOT counted in num_processed_docs. + assert_eq!(counters.num_processed_docs(), 2); + + // Only the two "keep" docs are forwarded. + let output_messages = indexer_inbox.drain_for_test(); + let total_downstream_docs: usize = output_messages + .into_iter() + .filter_map(|msg| msg.downcast::().ok()) + .map(|batch| batch.docs.len()) + .sum(); + assert_eq!(total_downstream_docs, 2); + universe.assert_quit().await; + Ok(()) + } + + /// `record_error` must route each variant to the right counter — exercised directly + /// without going through the actor harness so the routing logic is locked in. + #[test] + fn test_doc_processor_counters_record_error_routes_correctly() { + let counters = DocProcessorCounters::new("idx".to_string(), "src".to_string()); + counters.record_error(DocProcessorError::JsonParsing("bad json".to_string()), 10); + counters.record_error( + DocProcessorError::DocMapperParsing(DocParsingError::RequiredField( + "missing field".to_string(), + )), + 20, + ); + + // Build a Terminate via a minimal VRL script that aborts. + let abort_program = TransformConfig::for_test("abort") + .compile_vrl_script() + .unwrap() + .0; + let mut runtime = + vrl::compiler::runtime::Runtime::new(vrl::compiler::state::RuntimeState::default()); + let mut value = VrlValue::Object(Default::default()); + let mut metadata = VrlValue::Object(Default::default()); + let mut secrets = VrlSecrets::default(); + let mut target = vrl::compiler::TargetValueRef { + value: &mut value, + metadata: &mut metadata, + secrets: &mut secrets, + }; + let abort_terminate = runtime + .resolve( + &mut target, + &abort_program, + &vrl::compiler::TimeZone::default(), + ) + .unwrap_err(); + assert!(matches!(abort_terminate, VrlTerminate::Abort(_))); + + // Same Terminate routed through both variants exercises both counter paths. + counters.record_error( + DocProcessorError::TransformAbort(abort_terminate.clone()), + 30, + ); + counters.record_error(DocProcessorError::Transform(abort_terminate), 40); + + assert_eq!(counters.json_parse_errors.get_num_docs(), 1); + assert_eq!(counters.doc_mapper_errors.get_num_docs(), 1); + assert_eq!(counters.transform_aborts.get_num_docs(), 1); + assert_eq!(counters.transform_errors.get_num_docs(), 1); + assert_eq!(counters.valid.get_num_docs(), 0); + assert_eq!(counters.num_bytes_total.load(Ordering::Relaxed), 100); + assert_eq!(counters.num_invalid_docs(), 3); + assert_eq!(counters.num_dropped_docs(), 1); + // 1 json_parse + 1 doc_mapper + 1 transform_error = 3; the abort is NOT included. + assert_eq!(counters.num_processed_docs(), 3); + // CLI/API math `indexed = processed - invalid` gives 0 (no valid docs in this + // synthetic test), proving aborts don't slip in. + assert_eq!( + counters.num_processed_docs() - counters.num_invalid_docs(), + 0, + ); + } + + /// Codex review finding (PR #6472): make sure aborts are silent end-to-end — + /// `should_warn()` must return false for `TransformAbort` (and true for every other + /// variant) so the `rate_limited_warn!` in `process_raw_doc` skips it. + #[test] + fn test_doc_processor_error_should_warn_excludes_transform_abort() { + // Aborts are silent. + let abort_program = TransformConfig::for_test("abort") + .compile_vrl_script() + .unwrap() + .0; + let mut runtime = + vrl::compiler::runtime::Runtime::new(vrl::compiler::state::RuntimeState::default()); + let mut value = VrlValue::Object(Default::default()); + let mut metadata = VrlValue::Object(Default::default()); + let mut secrets = VrlSecrets::default(); + let mut target = vrl::compiler::TargetValueRef { + value: &mut value, + metadata: &mut metadata, + secrets: &mut secrets, + }; + let abort_terminate = runtime + .resolve( + &mut target, + &abort_program, + &vrl::compiler::TimeZone::default(), + ) + .unwrap_err(); + let abort_err = DocProcessorError::TransformAbort(abort_terminate.clone()); + assert!( + !abort_err.should_warn(), + "TransformAbort must not produce a warn log" + ); + + // Every other variant warns. Exhaustive list mirrors `DocProcessorError`. + let warning_variants = [ + DocProcessorError::JsonParsing("bad".to_string()), + DocProcessorError::DocMapperParsing(DocParsingError::RequiredField( + "missing".to_string(), + )), + DocProcessorError::Transform(abort_terminate), + ]; + for err in warning_variants { + assert!(err.should_warn(), "non-abort variant must warn: {err}"); + } + } + + /// Mixed-category batch: exercises all four DocProcessor outcomes simultaneously + /// to lock in that counters and the indexer-forwarding behavior remain independent + /// — a VRL abort must not influence a sibling doc's json-parse or doc-mapper outcome. + /// + /// Batch composition: + /// - 1 valid doc (kept) + /// - 1 doc that triggers `abort` via `drop_on_abort=true` (transform_abort) + /// - 1 doc with malformed JSON (json_parse_error) + /// - 1 doc missing the required `timestamp` field (doc_mapper_error) + #[tokio::test] + async fn test_doc_processor_mixed_category_batch() -> anyhow::Result<()> { + let script = r#" + if .drop == true { abort } + "#; + let universe = Universe::with_accelerated_time(); + let (mailbox, handle, indexer_inbox) = + build_vrl_doc_processor(&universe, script, true).await; + mailbox + .send_message(RawDocBatch::for_test( + &[ + // valid + br#"{"body": "ok", "timestamp": 1628837062, "response_date": "2021-12-19T16:39:59+00:00", "response_time": 2, "response_payload": "YWJj"}"#, + // aborts via VRL + br#"{"body": "drop", "drop": true, "timestamp": 1628837063, "response_date": "2021-12-19T16:39:59+00:00", "response_time": 2, "response_payload": "YWJj"}"#, + // malformed JSON + b"{not-json", + // missing required `timestamp` + br#"{"body": "no-ts", "response_date": "2021-12-19T16:39:59+00:00", "response_time": 2, "response_payload": "YWJj"}"#, + ], + 0..4, + )) + .await?; + let counters = handle.process_pending_and_observe().await.state; + + assert_eq!(counters.valid.get_num_docs(), 1, "exactly 1 valid doc"); + assert_eq!( + counters.transform_aborts.get_num_docs(), + 1, + "exactly 1 VRL abort" + ); + assert_eq!( + counters.transform_errors.get_num_docs(), + 0, + "no transform_errors when drop_on_abort=true" + ); + assert_eq!( + counters.json_parse_errors.get_num_docs(), + 1, + "exactly 1 json parse error" + ); + assert_eq!( + counters.doc_mapper_errors.get_num_docs(), + 1, + "exactly 1 doc_mapper error (missing timestamp)" + ); + + assert_eq!( + counters.num_invalid_docs(), + 2, + "json + doc_mapper, not abort" + ); + assert_eq!(counters.num_dropped_docs(), 1, "the abort only"); + // 1 valid + 1 json + 1 doc_mapper = 3; the abort is excluded. + assert_eq!( + counters.num_processed_docs(), + 3, + "aborts excluded so CLI `indexed = processed - invalid` stays correct" + ); + // CLI/API math: indexed docs == num_processed_docs - num_invalid_docs. + // Exactly 1 doc was actually indexed (the "ok" doc). + assert_eq!( + counters.num_processed_docs() - counters.num_invalid_docs(), + 1, + "exactly 1 doc was indexed — aborts must not inflate this" + ); + + // Only the single valid doc reaches the indexer. + let output_messages = indexer_inbox.drain_for_test(); + let total_downstream_docs: usize = output_messages + .into_iter() + .filter_map(|msg| msg.downcast::().ok()) + .map(|batch| batch.docs.len()) + .sum(); + assert_eq!(total_downstream_docs, 1); + + // Pipe through IndexingStatistics to verify the aggregation surface matches what + // the CLI/API observes. Aborts must land in num_dropped_docs, NOT num_docs — + // otherwise downstream "indexed = num_docs - num_invalid_docs" math counts + // filtered docs as indexed (Codex review on PR #6472). + let stats = crate::models::IndexingStatistics::default().add_actor_counters( + &counters, + &crate::actors::IndexerCounters::default(), + &crate::actors::UploaderCounters { + num_staged_splits: Arc::new(AtomicU64::new(0)), + num_uploaded_splits: Arc::new(AtomicU64::new(0)), + }, + &crate::actors::PublisherCounters::default(), + ); + assert_eq!(stats.num_docs, 3, "num_docs excludes the abort"); + assert_eq!(stats.num_invalid_docs, 2, "json + doc_mapper"); + assert_eq!(stats.num_dropped_docs, 1, "the abort surfaces here"); + assert_eq!( + stats.num_docs - stats.num_invalid_docs, + 1, + "CLI: indexed == num_docs - num_invalid_docs; exactly 1 doc was indexed" + ); + + universe.assert_quit().await; + Ok(()) + } } diff --git a/quickwit/quickwit-indexing/src/actors/vrl_processing.rs b/quickwit/quickwit-indexing/src/actors/vrl_processing.rs index c441be3544a..f2f02f1ebd3 100644 --- a/quickwit/quickwit-indexing/src/actors/vrl_processing.rs +++ b/quickwit/quickwit-indexing/src/actors/vrl_processing.rs @@ -44,6 +44,11 @@ pub(super) struct VrlProgram { runtime: Runtime, metadata: VrlValue, secrets: VrlSecrets, + // When `true`, documents terminated by the VRL `abort` expression are dropped silently + // (no warn log) and routed to a distinct `DocProcessorError::TransformAbort` variant so + // they can be tracked separately from unexpected runtime errors. When `false`, aborts + // are treated like any other VRL runtime error: warned and counted as transform errors. + drop_on_abort: bool, } impl VrlProgram { @@ -58,12 +63,18 @@ impl VrlProgram { metadata: &mut self.metadata, secrets: &mut self.secrets, }; + let drop_on_abort = self.drop_on_abort; let runtime_res = self .runtime .resolve(&mut target, &self.program, &self.timezone) - .map_err(|transform_error| { - warn!(transform_error=?transform_error); - DocProcessorError::Transform(transform_error) + .map_err(|transform_error| match transform_error { + VrlTerminate::Abort(_) if drop_on_abort => { + DocProcessorError::TransformAbort(transform_error) + } + _ => { + warn!(transform_error=?transform_error); + DocProcessorError::Transform(transform_error) + } }); if let VrlValue::Object(metadata) = target.metadata { @@ -75,6 +86,7 @@ impl VrlProgram { } pub fn try_from_transform_config(transform_config: TransformConfig) -> anyhow::Result { + let drop_on_abort = transform_config.drop_on_abort(); let (program, timezone) = transform_config.compile_vrl_script()?; let state = RuntimeState::default(); let runtime = Runtime::new(state); @@ -85,6 +97,204 @@ impl VrlProgram { timezone, metadata: VrlValue::Object(BTreeMap::new()), secrets: VrlSecrets::default(), + drop_on_abort, }) } } + +#[cfg(test)] +mod tests { + use super::*; + + /// Classifies a `transform_doc` result so tests can assert on category without needing + /// `VrlDoc: Debug`. + #[derive(Debug, PartialEq, Eq)] + enum Outcome { + Ok, + TransformAbort, + TransformError, + DocMapperParsing, + JsonParsing, + OltpLogsParsing, + OltpTracesParsing, + } + + fn classify(result: &Result) -> Outcome { + match result { + Ok(_) => Outcome::Ok, + Err(DocProcessorError::TransformAbort(_)) => Outcome::TransformAbort, + Err(DocProcessorError::Transform(VrlTerminate::Abort(_))) => Outcome::TransformError, + Err(DocProcessorError::Transform(VrlTerminate::Error(_))) => Outcome::TransformError, + Err(DocProcessorError::DocMapperParsing(_)) => Outcome::DocMapperParsing, + Err(DocProcessorError::JsonParsing(_)) => Outcome::JsonParsing, + Err(DocProcessorError::OltpLogsParsing(_)) => Outcome::OltpLogsParsing, + Err(DocProcessorError::OltpTracesParsing(_)) => Outcome::OltpTracesParsing, + } + } + + fn is_explicit_abort(result: &Result) -> bool { + matches!( + result, + Err(DocProcessorError::TransformAbort(VrlTerminate::Abort(_))) + | Err(DocProcessorError::Transform(VrlTerminate::Abort(_))) + ) + } + + fn vrl_doc(json: serde_json::Value) -> VrlDoc { + let vrl_value = serde_json::from_value::(json).expect("valid VRL value"); + let num_bytes = 0; + VrlDoc::new(vrl_value, num_bytes) + } + + #[test] + fn test_successful_transform_passes_through() { + let transform_config = TransformConfig::for_test(".body = upcase(string!(.body))"); + let mut vrl_program = VrlProgram::try_from_transform_config(transform_config).unwrap(); + let result = vrl_program + .transform_doc(vrl_doc(serde_json::json!({"body": "hello"}))) + .expect("transform should succeed"); + let json = serde_json::to_value(result.vrl_value).unwrap(); + assert_eq!(json, serde_json::json!({"body": "HELLO"})); + } + + #[test] + fn test_abort_with_drop_on_abort_true_routes_to_transform_abort() { + let transform_config = TransformConfig::for_test("abort").with_drop_on_abort(true); + let mut vrl_program = VrlProgram::try_from_transform_config(transform_config).unwrap(); + let result = vrl_program.transform_doc(vrl_doc(serde_json::json!({"body": "x"}))); + assert_eq!(classify(&result), Outcome::TransformAbort); + assert!(is_explicit_abort(&result)); + } + + #[test] + fn test_abort_with_drop_on_abort_false_routes_to_transform_error() { + let transform_config = TransformConfig::for_test("abort").with_drop_on_abort(false); + let mut vrl_program = VrlProgram::try_from_transform_config(transform_config).unwrap(); + let result = vrl_program.transform_doc(vrl_doc(serde_json::json!({"body": "x"}))); + assert_eq!(classify(&result), Outcome::TransformError); + assert!(is_explicit_abort(&result)); + } + + #[test] + fn test_runtime_error_routes_to_transform_error_regardless_of_drop_on_abort() { + // `parse_json!` on a non-string raises a runtime error (Terminate::Error), + // which is distinct from an explicit `abort`. + let script = "parse_json!(.body)"; + for drop_on_abort in [true, false] { + let transform_config = + TransformConfig::for_test(script).with_drop_on_abort(drop_on_abort); + let mut vrl_program = VrlProgram::try_from_transform_config(transform_config).unwrap(); + let result = vrl_program.transform_doc(vrl_doc(serde_json::json!({"body": 42}))); + assert_eq!( + classify(&result), + Outcome::TransformError, + "runtime error must classify as TransformError (drop_on_abort={drop_on_abort})" + ); + assert!( + !is_explicit_abort(&result), + "runtime error must NOT be an Abort (drop_on_abort={drop_on_abort})" + ); + } + } + + #[test] + fn test_drop_on_abort_does_not_affect_successful_transforms() { + for drop_on_abort in [true, false] { + let transform_config = TransformConfig::for_test(".body = upcase(string!(.body))") + .with_drop_on_abort(drop_on_abort); + let mut vrl_program = VrlProgram::try_from_transform_config(transform_config).unwrap(); + let result = vrl_program + .transform_doc(vrl_doc(serde_json::json!({"body": "hello"}))) + .expect("transform should succeed regardless of drop_on_abort"); + let json = serde_json::to_value(result.vrl_value).unwrap(); + assert_eq!(json, serde_json::json!({"body": "HELLO"})); + } + } + + #[test] + fn test_conditional_abort_only_drops_matching_docs() { + // Validates the intended use case: filter-style scripts that abort on a predicate. + let script = r#" + if .drop == true { + abort + } + "#; + let transform_config = TransformConfig::for_test(script).with_drop_on_abort(true); + let mut vrl_program = VrlProgram::try_from_transform_config(transform_config).unwrap(); + + // Non-aborting doc passes through. + let kept = vrl_program + .transform_doc(vrl_doc(serde_json::json!({"body": "keep", "drop": false}))) + .expect("non-aborting doc should pass through"); + let kept_json = serde_json::to_value(kept.vrl_value).unwrap(); + assert_eq!( + kept_json, + serde_json::json!({"body": "keep", "drop": false}) + ); + + // Aborting doc returns TransformAbort. + let dropped = vrl_program.transform_doc(vrl_doc(serde_json::json!({"drop": true}))); + assert_eq!(classify(&dropped), Outcome::TransformAbort); + assert!(is_explicit_abort(&dropped)); + } + + #[test] + fn test_abort_with_message_routes_same_as_bare_abort() { + // VRL allows `abort "reason"` — verify message-bearing aborts route identically + // to bare aborts (same Terminate::Abort variant, same routing). + for (script, drop_on_abort, expected) in [ + ( + r#"abort "filtered as debug""#, + true, + Outcome::TransformAbort, + ), + ( + r#"abort "filtered as debug""#, + false, + Outcome::TransformError, + ), + ] { + let transform_config = + TransformConfig::for_test(script).with_drop_on_abort(drop_on_abort); + let mut vrl_program = VrlProgram::try_from_transform_config(transform_config).unwrap(); + let result = vrl_program.transform_doc(vrl_doc(serde_json::json!({"body": "x"}))); + assert_eq!( + classify(&result), + expected, + "abort-with-message must route same as bare abort (drop_on_abort={drop_on_abort})" + ); + assert!(is_explicit_abort(&result)); + } + } + + #[test] + fn test_transform_abort_error_display_includes_reason() { + // Display impl is used in logs and `?` propagation; the abort reason must surface. + let transform_config = + TransformConfig::for_test(r#"abort "explicit reason""#).with_drop_on_abort(true); + let mut vrl_program = VrlProgram::try_from_transform_config(transform_config).unwrap(); + let result = vrl_program.transform_doc(vrl_doc(serde_json::json!({"body": "x"}))); + let err = match result { + Ok(_) => panic!("expected an abort error"), + Err(err) => err, + }; + let rendered = format!("{err}"); + assert!( + rendered.contains("aborted") + || rendered.contains("abort") + || rendered.contains("explicit reason"), + "Display should describe an abort; got: {rendered}" + ); + } + + #[test] + fn test_num_bytes_is_preserved_through_transform() { + let transform_config = TransformConfig::for_test(".body = upcase(string!(.body))"); + let mut vrl_program = VrlProgram::try_from_transform_config(transform_config).unwrap(); + let vrl_value = + serde_json::from_value::(serde_json::json!({"body": "h"})).unwrap(); + let input = VrlDoc::new(vrl_value, 1234); + let output = vrl_program.transform_doc(input).unwrap(); + assert_eq!(output.num_bytes, 1234); + } +} diff --git a/quickwit/quickwit-indexing/src/models/indexing_statistics.rs b/quickwit/quickwit-indexing/src/models/indexing_statistics.rs index abe6f4a69df..abf8b226635 100644 --- a/quickwit/quickwit-indexing/src/models/indexing_statistics.rs +++ b/quickwit/quickwit-indexing/src/models/indexing_statistics.rs @@ -24,10 +24,15 @@ use crate::actors::{DocProcessorCounters, IndexerCounters, PublisherCounters, Up /// A Struct that holds all statistical data about indexing #[derive(Clone, Debug, Default, Serialize, utoipa::ToSchema)] pub struct IndexingStatistics { - /// Number of document processed (valid or not) + /// Number of document processed (valid or not). Excludes documents intentionally + /// dropped via VRL `abort` (see [`Self::num_dropped_docs`]). pub num_docs: u64, /// Number of document parse error, or missing timestamps pub num_invalid_docs: u64, + /// Number of documents intentionally dropped via the VRL `abort` expression while + /// `drop_on_abort` was enabled. These are intentional filters, not errors, and are + /// not counted in `num_docs` or `num_invalid_docs`. + pub num_dropped_docs: u64, /// Number of created split pub num_local_splits: u64, /// Number of staged splits @@ -64,6 +69,7 @@ impl IndexingStatistics { ) -> Self { self.num_docs += doc_processor_counters.num_processed_docs(); self.num_invalid_docs += doc_processor_counters.num_invalid_docs(); + self.num_dropped_docs += doc_processor_counters.num_dropped_docs(); self.num_local_splits += indexer_counters.num_splits_emitted; self.total_bytes_processed += doc_processor_counters .num_bytes_total