Skip to content

Commit 6077a2f

Browse files
committed
fix(cli): validate transform VRL and conditions with --no-environment
vector validate --no-environment now catches VRL compilation errors and invalid condition expressions in transforms before any environment resources are available. Design: - Add validate(&TransformContext) to TransformConfig; remap and filter implement it to compile VRL programs and conditions respectively - route, exclusive_route, and sample also validate their condition fields - Introduce validate_transforms(), called in both --no-environment and normal validate paths, which builds stub enrichment tables from the config so VRL can resolve table name references without loading data - Remove the validate() call from check_outputs: that function is for graph/wiring checks only; transform compilation belongs in validate_transforms() - Short-circuit environment checks when transform validation fails to avoid running health checks against external services for known-invalid configs - Add transforms-filter and transforms-aws_ec2_metadata to cli-tests feature set so the new integration tests compile correctly - Convert all CLI integration test configs to YAML; add create_yaml_file helper so test files carry the .yaml extension Vector uses for format detection Fixes #15037
1 parent 6289586 commit 6077a2f

12 files changed

Lines changed: 376 additions & 71 deletions

File tree

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
Fixed `vector validate --no-environment` so it reports VRL and condition compilation errors for transforms without requiring full environment-dependent component initialization.
2+
3+
authors: pront

src/conditions/mod.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,14 @@ impl AnyCondition {
210210
AnyCondition::Map(m) => m.build(enrichment_tables, metrics_storage),
211211
}
212212
}
213+
214+
pub fn validate(
215+
&self,
216+
enrichment_tables: &vector_lib::enrichment::TableRegistry,
217+
metrics_storage: &MetricsStorage,
218+
) -> crate::Result<()> {
219+
self.build(enrichment_tables, metrics_storage).map(|_| ())
220+
}
213221
}
214222

215223
impl From<ConditionConfig> for AnyCondition {

src/config/transform.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,7 @@ pub trait TransformConfig: DynClone + NamedComponent + core::fmt::Debug + Send +
250250
///
251251
/// If validation does not succeed, an error variant containing a list of all validation errors
252252
/// is returned.
253-
fn validate(&self, _merged_definition: &schema::Definition) -> Result<(), Vec<String>> {
253+
fn validate(&self, _context: &TransformContext) -> Result<(), Vec<String>> {
254254
Ok(())
255255
}
256256

src/config/validation.rs

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::{collections::HashMap, path::PathBuf};
22

3+
use crate::transforms::route::UNMATCHED_ROUTE;
34
use futures_util::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, stream};
45
use heim::{disk::Partition, units::information::byte};
56
use indexmap::IndexMap;
@@ -9,7 +10,6 @@ use super::{
910
ComponentKey, Config, OutputId, Resource, builder::ConfigBuilder,
1011
transform::get_transform_output_ids,
1112
};
12-
use crate::config::schema;
1313

1414
/// Minimum value (exclusive) for EWMA alpha options.
1515
/// The alpha value must be strictly greater than this value.
@@ -234,23 +234,22 @@ pub fn check_outputs(config: &ConfigBuilder) -> Result<(), Vec<String>> {
234234
}
235235

236236
for (key, transform) in config.transforms.iter() {
237-
// use the most general definition possible, since the real value isn't known yet.
238-
let definition = schema::Definition::any();
239-
240-
if let Err(errs) = transform.inner.validate(&definition) {
241-
errors.extend(errs.into_iter().map(|msg| format!("Transform {key} {msg}")));
242-
}
243-
244-
if get_transform_output_ids(
237+
let output_ids = get_transform_output_ids(
245238
transform.inner.as_ref(),
246239
key.clone(),
247240
config.schema.log_namespace(),
248241
)
249-
.any(|output| matches!(output.port, Some(output) if output == DEFAULT_OUTPUT))
250-
{
251-
errors.push(format!(
252-
"Transform {key} cannot have a named output with reserved name: `{DEFAULT_OUTPUT}`"
253-
));
242+
.collect::<Vec<_>>();
243+
244+
for reserved in [DEFAULT_OUTPUT, UNMATCHED_ROUTE] {
245+
if output_ids
246+
.iter()
247+
.any(|output| matches!(&output.port, Some(port) if port == reserved))
248+
{
249+
errors.push(format!(
250+
"Transform {key} cannot have a named output with reserved name: `{reserved}`"
251+
));
252+
}
254253
}
255254
}
256255

src/transforms/exclusive_route/config.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ impl TransformConfig for ExclusiveRouteConfig {
101101
Input::all()
102102
}
103103

104-
fn validate(&self, _: &schema::Definition) -> Result<(), Vec<String>> {
104+
fn validate(&self, context: &TransformContext) -> Result<(), Vec<String>> {
105105
let mut errors = Vec::new();
106106

107107
let mut counts = std::collections::HashMap::new();
@@ -127,6 +127,15 @@ impl TransformConfig for ExclusiveRouteConfig {
127127
errors.push(format!("Using reserved '{UNMATCHED_ROUTE}' name."));
128128
}
129129

130+
for route in &self.routes {
131+
if let Err(e) = route
132+
.condition
133+
.validate(&context.enrichment_tables, &context.metrics_storage)
134+
{
135+
errors.push(format!("route \"{}\": {e}", route.name));
136+
}
137+
}
138+
130139
if errors.is_empty() {
131140
Ok(())
132141
} else {

src/transforms/filter.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,12 @@ impl TransformConfig for FilterConfig {
5050
)?)))
5151
}
5252

53+
fn validate(&self, context: &TransformContext) -> Result<(), Vec<String>> {
54+
self.condition
55+
.validate(&context.enrichment_tables, &context.metrics_storage)
56+
.map_err(|e| vec![e.to_string()])
57+
}
58+
5359
fn input(&self) -> Input {
5460
Input::all()
5561
}

src/transforms/remap.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,16 @@ impl TransformConfig for RemapConfig {
277277
Ok(transform)
278278
}
279279

280+
fn validate(&self, context: &TransformContext) -> std::result::Result<(), Vec<String>> {
281+
self.compile_vrl_program(
282+
context.enrichment_tables.clone(),
283+
context.metrics_storage.clone(),
284+
context.merged_schema_definition.clone(),
285+
)
286+
.map(|_| ())
287+
.map_err(|e| vec![e.to_string()])
288+
}
289+
280290
fn input(&self) -> Input {
281291
Input::all()
282292
}

src/transforms/route.rs

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -131,13 +131,26 @@ impl TransformConfig for RouteConfig {
131131
Input::all()
132132
}
133133

134-
fn validate(&self, _: &schema::Definition) -> Result<(), Vec<String>> {
134+
fn validate(&self, context: &TransformContext) -> Result<(), Vec<String>> {
135+
let mut errors = Vec::new();
136+
135137
if self.route.contains_key(UNMATCHED_ROUTE) {
136-
Err(vec![format!(
138+
errors.push(format!(
137139
"cannot have a named output with reserved name: `{UNMATCHED_ROUTE}`"
138-
)])
139-
} else {
140+
));
141+
}
142+
143+
for (name, condition) in &self.route {
144+
if let Err(e) = condition.validate(&context.enrichment_tables, &context.metrics_storage)
145+
{
146+
errors.push(format!("route \"{name}\": {e}"));
147+
}
148+
}
149+
150+
if errors.is_empty() {
140151
Ok(())
152+
} else {
153+
Err(errors)
141154
}
142155
}
143156

src/transforms/sample/config.rs

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -222,10 +222,26 @@ impl TransformConfig for SampleConfig {
222222
Input::new(DataType::Log | DataType::Trace)
223223
}
224224

225-
fn validate(&self, _: &schema::Definition) -> Result<(), Vec<String>> {
226-
self.sample_rate()
227-
.map(|_| ())
228-
.map_err(|e| vec![e.to_string()])
225+
fn validate(&self, context: &TransformContext) -> Result<(), Vec<String>> {
226+
let mut errors = self
227+
.sample_rate()
228+
.err()
229+
.map(|e| vec![e.to_string()])
230+
.unwrap_or_default();
231+
232+
if let Some(Err(e)) = self
233+
.exclude
234+
.as_ref()
235+
.map(|c| c.validate(&context.enrichment_tables, &context.metrics_storage))
236+
{
237+
errors.push(format!("exclude: {e}"));
238+
}
239+
240+
if errors.is_empty() {
241+
Ok(())
242+
} else {
243+
Err(errors)
244+
}
229245
}
230246

231247
fn outputs(
@@ -317,7 +333,11 @@ mod tests {
317333
exclude: None,
318334
};
319335

320-
assert!(config.validate(&crate::schema::Definition::any()).is_ok());
336+
assert!(
337+
config
338+
.validate(&crate::config::TransformContext::default())
339+
.is_ok()
340+
);
321341
}
322342

323343
#[test]

src/validate.rs

Lines changed: 120 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,68 @@
11
#![allow(missing_docs)]
22

3-
use std::{fmt, fs::remove_dir_all, path::PathBuf};
3+
use std::{collections::HashMap, fmt, fs::remove_dir_all, path::PathBuf};
44

55
use clap::Parser;
66
use colored::*;
77
use exitcode::ExitCode;
8+
use vector_lib::enrichment::{Case, IndexHandle, TableRegistry};
9+
use vector_vrl_metrics::MetricsStorage;
10+
use vrl::value::ObjectMap;
811

912
use crate::{
10-
config::{self, Config, ConfigDiff, loading::ConfigBuilderLoader},
13+
config::{self, Config, ConfigDiff, TransformContext, loading::ConfigBuilderLoader},
14+
schema::Definition,
1115
topology::{
1216
self,
1317
builder::{TopologyPieces, TopologyPiecesBuilder},
1418
},
1519
};
1620

21+
/// Stub enrichment table used during transform validation so VRL can resolve
22+
/// table name references without loading actual table data.
23+
#[derive(Clone)]
24+
struct StubEnrichmentTable;
25+
26+
impl vector_lib::enrichment::Table for StubEnrichmentTable {
27+
fn find_table_row<'a>(
28+
&self,
29+
_: Case,
30+
_: &'a [vector_lib::enrichment::Condition<'a>],
31+
_: Option<&[String]>,
32+
_: Option<&vrl::value::Value>,
33+
_: Option<IndexHandle>,
34+
) -> Result<ObjectMap, vector_lib::enrichment::Error> {
35+
unreachable!("stub table is compile-time only")
36+
}
37+
38+
fn find_table_rows<'a>(
39+
&self,
40+
_: Case,
41+
_: &'a [vector_lib::enrichment::Condition<'a>],
42+
_: Option<&[String]>,
43+
_: Option<&vrl::value::Value>,
44+
_: Option<IndexHandle>,
45+
) -> Result<Vec<ObjectMap>, vector_lib::enrichment::Error> {
46+
unreachable!("stub table is compile-time only")
47+
}
48+
49+
fn add_index(
50+
&mut self,
51+
_: Case,
52+
_: &[&str],
53+
) -> Result<IndexHandle, vector_lib::enrichment::Error> {
54+
Ok(IndexHandle(0))
55+
}
56+
57+
fn index_fields(&self) -> Vec<(Case, Vec<String>)> {
58+
vec![]
59+
}
60+
61+
fn needs_reload(&self) -> bool {
62+
false
63+
}
64+
}
65+
1766
const TEMPORARY_DIRECTORY: &str = "validate_tmp";
1867

1968
#[derive(Parser, Debug)]
@@ -117,6 +166,8 @@ pub async fn validate(opts: &Opts, color: bool) -> ExitCode {
117166
None => return exitcode::CONFIG,
118167
};
119168

169+
validated &= validate_transforms(&config, &mut fmt).await;
170+
120171
if !opts.no_environment {
121172
if let Some(tmp_directory) = create_tmp_directory(&mut config, &mut fmt) {
122173
validated &= validate_environment(opts, &config, &mut fmt).await;
@@ -180,6 +231,73 @@ pub fn validate_config(opts: &Opts, fmt: &mut Formatter) -> Option<Config> {
180231
Some(config)
181232
}
182233

234+
async fn validate_transforms(config: &Config, fmt: &mut Formatter) -> bool {
235+
let enrichment_tables = TableRegistry::default();
236+
// Register stub tables so VRL can resolve configured enrichment table names
237+
// without loading actual data. This lets us catch real VRL errors (syntax,
238+
// type, wrong table name) while deferring data-loading to the environment phase.
239+
let stubs: HashMap<String, Box<dyn vector_lib::enrichment::Table + Send + Sync>> = config
240+
.enrichment_tables
241+
.keys()
242+
.map(|key| {
243+
(
244+
key.to_string(),
245+
Box::new(StubEnrichmentTable)
246+
as Box<dyn vector_lib::enrichment::Table + Send + Sync>,
247+
)
248+
})
249+
.collect();
250+
if !stubs.is_empty() {
251+
enrichment_tables.load(stubs);
252+
// Do not call finish_load(): table_ids() and add_index() (used during
253+
// VRL compilation) both operate on the loading stage. finish_load()
254+
// would move tables to the ArcSwap and make table_ids() return nothing.
255+
}
256+
let mut definition_cache = HashMap::new();
257+
let mut errors = Vec::new();
258+
259+
for (key, transform) in config.transforms() {
260+
let input_definitions = topology::schema::input_definitions(
261+
&transform.inputs,
262+
config,
263+
enrichment_tables.clone(),
264+
&mut definition_cache,
265+
)
266+
.unwrap_or_default();
267+
268+
let merged_schema_definition = input_definitions
269+
.iter()
270+
.map(|(_, definition)| definition.clone())
271+
.reduce(Definition::merge)
272+
.unwrap_or_else(Definition::any);
273+
274+
let context = TransformContext {
275+
key: Some(key.clone()),
276+
globals: config.global.clone(),
277+
enrichment_tables: enrichment_tables.clone(),
278+
metrics_storage: MetricsStorage::default(),
279+
merged_schema_definition,
280+
schema: config.schema,
281+
..Default::default()
282+
};
283+
284+
if let Err(errs) = transform.inner.validate(&context) {
285+
for err in errs {
286+
errors.push(format!("Transform \"{key}\": {err}"));
287+
}
288+
}
289+
}
290+
291+
if errors.is_empty() {
292+
fmt.success("Transforms configuration");
293+
true
294+
} else {
295+
fmt.title("Transform errors");
296+
fmt.sub_error(errors);
297+
false
298+
}
299+
}
300+
183301
async fn validate_environment(opts: &Opts, config: &Config, fmt: &mut Formatter) -> bool {
184302
let diff = ConfigDiff::initial(config);
185303

0 commit comments

Comments
 (0)