Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1103,7 +1103,7 @@ webhdfs-integration-tests = ["sinks-webhdfs"]
windows-event-log-integration-tests = ["sources-windows_event_log-integration-tests"]
disable-resolv-conf = []
shutdown-tests = ["api", "sinks-blackhole", "sinks-console", "sinks-prometheus", "sources", "transforms-lua", "transforms-remap", "unix"]
cli-tests = ["sinks-blackhole", "sinks-socket", "sources-demo_logs", "sources-file", "transforms-remap"]
cli-tests = ["sinks-blackhole", "sinks-socket", "sources-demo_logs", "sources-file", "transforms-remap", "transforms-filter", "transforms-aws_ec2_metadata"]
test-utils = ["vector-lib/test"]

# End-to-End testing-related features
Expand Down
3 changes: 3 additions & 0 deletions changelog.d/15037_validate_no_environment_vrl.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Fixed `vector validate --no-environment` so it reports VRL and condition compilation errors for transforms without requiring full environment-dependent component initialization.

authors: pront
8 changes: 8 additions & 0 deletions src/conditions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,14 @@ impl AnyCondition {
AnyCondition::Map(m) => m.build(enrichment_tables, metrics_storage),
}
}

pub fn validate(
&self,
enrichment_tables: &vector_lib::enrichment::TableRegistry,
metrics_storage: &MetricsStorage,
) -> crate::Result<()> {
self.build(enrichment_tables, metrics_storage).map(|_| ())
}
}

impl From<ConditionConfig> for AnyCondition {
Expand Down
20 changes: 17 additions & 3 deletions src/config/transform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,15 +242,29 @@ pub trait TransformConfig: DynClone + NamedComponent + core::fmt::Debug + Send +
) -> Vec<TransformOutput>;

/// Validates that the configuration of the transform is valid.
/// Validates structural constraints on the transform configuration that do not require
/// environment resources: reserved output names, duplicate route names, invalid sample
/// rates, and similar config-level checks. Called during config compilation so errors
/// are reported on both `vector validate` and normal startup/reload.
///
/// This would generally be where logical conditions were checked, such as ensuring a transform
/// isn't using a named output that matches a reserved output name, and so on.
/// # Errors
///
/// If validation does not succeed, an error variant containing a list of all validation errors
/// is returned.
fn validate(&self, _context: &TransformContext) -> Result<(), Vec<String>> {
Ok(())
}

/// Validates the transform configuration against environment resources: compiles VRL
/// programs, builds conditions, and resolves enrichment table references. Only called
/// from `vector validate` (via `validate_transforms`), not during normal startup because
/// `build()` performs equivalent checks with real resources.
///
/// # Errors
///
/// If validation does not succeed, an error variant containing a list of all validation errors
/// is returned.
fn validate(&self, _merged_definition: &schema::Definition) -> Result<(), Vec<String>> {
fn validate_env(&self, _context: &TransformContext) -> Result<(), Vec<String>> {
Ok(())
}
Comment thread
pront marked this conversation as resolved.

Expand Down
12 changes: 6 additions & 6 deletions src/config/validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@ use indexmap::IndexMap;
use vector_lib::{buffers::config::DiskUsage, internal_event::DEFAULT_OUTPUT};

use super::{
ComponentKey, Config, OutputId, Resource, builder::ConfigBuilder,
ComponentKey, Config, OutputId, Resource, TransformContext, builder::ConfigBuilder,
transform::get_transform_output_ids,
};
use crate::config::schema;

/// Minimum value (exclusive) for EWMA alpha options.
/// The alpha value must be strictly greater than this value.
Expand Down Expand Up @@ -234,10 +233,11 @@ pub fn check_outputs(config: &ConfigBuilder) -> Result<(), Vec<String>> {
}

for (key, transform) in config.transforms.iter() {
// use the most general definition possible, since the real value isn't known yet.
let definition = schema::Definition::any();

if let Err(errs) = transform.inner.validate(&definition) {
// Structural validation: reserved names, duplicate routes, invalid sample rates.
// Uses a default context so transforms that require environment resources (VRL
// compilation, condition building) must guard on context.key being None and skip
// those checks — they run later in validate_transforms() with a real context.
if let Err(errs) = transform.inner.validate(&TransformContext::default()) {
errors.extend(errs.into_iter().map(|msg| format!("Transform {key} {msg}")));
}

Expand Down
18 changes: 18 additions & 0 deletions src/transforms/delay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,24 @@ impl TransformConfig for DelayConfig {
clone_input_definitions(input_definitions),
)]
}

fn validate(&self, _: &TransformContext) -> Result<(), Vec<String>> {
if self.delay_ms.as_millis() == 0 {
Err(vec!["delay must not be zero".to_string()])
} else {
Ok(())
}
}

fn validate_env(&self, context: &TransformContext) -> Result<(), Vec<String>> {
self.condition
.as_ref()
.map(|c| {
c.validate(&context.enrichment_tables, &context.metrics_storage)
.map_err(|e| vec![format!("condition: {e}")])
})
.unwrap_or(Ok(()))
}
}

pub struct Delay {
Expand Down
22 changes: 21 additions & 1 deletion src/transforms/exclusive_route/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ impl TransformConfig for ExclusiveRouteConfig {
Input::all()
}

fn validate(&self, _: &schema::Definition) -> Result<(), Vec<String>> {
fn validate(&self, _: &TransformContext) -> Result<(), Vec<String>> {
let mut errors = Vec::new();

let mut counts = std::collections::HashMap::new();
Expand Down Expand Up @@ -134,6 +134,26 @@ impl TransformConfig for ExclusiveRouteConfig {
}
}

fn validate_env(&self, context: &TransformContext) -> Result<(), Vec<String>> {
let errors: Vec<String> = self
.routes
.iter()
.filter_map(|route| {
route
.condition
.validate(&context.enrichment_tables, &context.metrics_storage)
.err()
.map(|e| format!("route \"{}\": {e}", route.name))
})
.collect();

if errors.is_empty() {
Ok(())
} else {
Err(errors)
}
}

fn outputs(
&self,
_: &TransformContext,
Expand Down
6 changes: 6 additions & 0 deletions src/transforms/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ impl TransformConfig for FilterConfig {
)?)))
}

fn validate_env(&self, context: &TransformContext) -> Result<(), Vec<String>> {
self.condition
.validate(&context.enrichment_tables, &context.metrics_storage)
.map_err(|e| vec![e.to_string()])
}

fn input(&self) -> Input {
Input::all()
}
Expand Down
49 changes: 49 additions & 0 deletions src/transforms/reduce/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,55 @@ impl TransformConfig for ReduceConfig {

vec![TransformOutput::new(DataType::Log, output_definitions)]
}

fn validate(&self, _: &TransformContext) -> Result<(), Vec<String>> {
let mut errors = Vec::new();

if self.ends_when.is_some() && self.starts_when.is_some() {
errors.push("only one of `ends_when` and `starts_when` can be provided".to_string());
}

for (path, _) in &self.merge_strategies {
match parse_target_path(path) {
Err(_) => errors.push(format!("Could not parse path: `{path}`")),
Ok(parsed) if parsed.path.segments.iter().any(|s| s.is_index()) => {
errors.push(format!(
"Merge strategies with indexes are currently not supported. Path: `{path}`"
));
}
Ok(_) => {}
}
}

if errors.is_empty() {
Ok(())
} else {
Err(errors)
}
}

fn validate_env(&self, context: &TransformContext) -> Result<(), Vec<String>> {
let mut errors = Vec::new();
if let Some(Err(e)) = self
.ends_when
.as_ref()
.map(|c| c.validate(&context.enrichment_tables, &context.metrics_storage))
{
errors.push(format!("ends_when: {e}"));
}
if let Some(Err(e)) = self
.starts_when
.as_ref()
.map(|c| c.validate(&context.enrichment_tables, &context.metrics_storage))
{
errors.push(format!("starts_when: {e}"));
}
if errors.is_empty() {
Ok(())
} else {
Err(errors)
}
}
}

#[cfg(test)]
Expand Down
10 changes: 10 additions & 0 deletions src/transforms/remap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,16 @@ impl TransformConfig for RemapConfig {
Ok(transform)
}

fn validate_env(&self, context: &TransformContext) -> std::result::Result<(), Vec<String>> {
self.compile_vrl_program(
context.enrichment_tables.clone(),
context.metrics_storage.clone(),
context.merged_schema_definition.clone(),
)
.map(|_| ())
.map_err(|e| vec![e.to_string()])
}

fn input(&self) -> Input {
Input::all()
}
Expand Down
21 changes: 20 additions & 1 deletion src/transforms/route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ impl TransformConfig for RouteConfig {
Input::all()
}

fn validate(&self, _: &schema::Definition) -> Result<(), Vec<String>> {
fn validate(&self, _: &TransformContext) -> Result<(), Vec<String>> {
if self.route.contains_key(UNMATCHED_ROUTE) {
Err(vec![format!(
"cannot have a named output with reserved name: `{UNMATCHED_ROUTE}`"
Expand All @@ -141,6 +141,25 @@ impl TransformConfig for RouteConfig {
}
}

fn validate_env(&self, context: &TransformContext) -> Result<(), Vec<String>> {
let errors: Vec<String> = self
.route
.iter()
.filter_map(|(name, condition)| {
condition
.validate(&context.enrichment_tables, &context.metrics_storage)
.err()
.map(|e| format!("route \"{name}\": {e}"))
})
.collect();

if errors.is_empty() {
Ok(())
} else {
Err(errors)
}
}

fn outputs(
&self,
_: &TransformContext,
Expand Down
20 changes: 18 additions & 2 deletions src/transforms/sample/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,12 +222,24 @@ impl TransformConfig for SampleConfig {
Input::new(DataType::Log | DataType::Trace)
}

fn validate(&self, _: &schema::Definition) -> Result<(), Vec<String>> {
fn validate(&self, _: &TransformContext) -> Result<(), Vec<String>> {
self.sample_rate()
.map(|_| ())
.map_err(|e| vec![e.to_string()])
}

fn validate_env(&self, context: &TransformContext) -> Result<(), Vec<String>> {
if let Some(Err(e)) = self
.exclude
.as_ref()
.map(|c| c.validate(&context.enrichment_tables, &context.metrics_storage))
{
Err(vec![format!("exclude: {e}")])
} else {
Ok(())
}
}

fn outputs(
&self,
_: &TransformContext,
Expand Down Expand Up @@ -317,7 +329,11 @@ mod tests {
exclude: None,
};

assert!(config.validate(&crate::schema::Definition::any()).is_ok());
assert!(
config
.validate(&crate::config::TransformContext::default())
.is_ok()
);
}

#[test]
Expand Down
22 changes: 22 additions & 0 deletions src/transforms/throttle/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,28 @@ impl TransformConfig for ThrottleConfig {
clone_input_definitions(input_definitions),
)]
}

fn validate(&self, _: &TransformContext) -> Result<(), Vec<String>> {
if self.threshold == 0 || self.window_secs.as_secs_f64() == 0.0 {
Err(vec![
"`threshold` and `window_secs` must be non-zero".to_string(),
])
} else {
Ok(())
}
}

fn validate_env(&self, context: &TransformContext) -> Result<(), Vec<String>> {
if let Some(Err(e)) = self
.exclude
.as_ref()
.map(|c| c.validate(&context.enrichment_tables, &context.metrics_storage))
{
Err(vec![format!("exclude: {e}")])
} else {
Ok(())
}
}
}

#[cfg(test)]
Expand Down
22 changes: 22 additions & 0 deletions src/transforms/window/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,26 @@ impl TransformConfig for WindowConfig {
clone_input_definitions(input_definitions),
)]
}

fn validate_env(&self, context: &TransformContext) -> Result<(), Vec<String>> {
let mut errors = Vec::new();
if let Some(Err(e)) = self
.forward_when
.as_ref()
.map(|c| c.validate(&context.enrichment_tables, &context.metrics_storage))
{
errors.push(format!("forward_when: {e}"));
}
if let Err(e) = self
.flush_when
.validate(&context.enrichment_tables, &context.metrics_storage)
{
errors.push(format!("flush_when: {e}"));
}
if errors.is_empty() {
Ok(())
} else {
Err(errors)
}
}
}
Loading
Loading