|
1 | 1 | #![allow(missing_docs)] |
2 | 2 |
|
3 | | -use std::{fmt, fs::remove_dir_all, path::PathBuf}; |
| 3 | +use std::{collections::HashMap, fmt, fs::remove_dir_all, path::PathBuf}; |
4 | 4 |
|
5 | 5 | use clap::Parser; |
6 | 6 | use colored::*; |
7 | 7 | use exitcode::ExitCode; |
| 8 | +use vector_lib::enrichment::{Case, IndexHandle, TableRegistry}; |
| 9 | +use vector_vrl_metrics::MetricsStorage; |
| 10 | +use vrl::value::ObjectMap; |
8 | 11 |
|
9 | 12 | use crate::{ |
10 | | - config::{self, Config, ConfigDiff, loading::ConfigBuilderLoader}, |
| 13 | + config::{self, Config, ConfigDiff, TransformContext, loading::ConfigBuilderLoader}, |
| 14 | + schema::Definition, |
11 | 15 | topology::{ |
12 | 16 | self, |
13 | 17 | builder::{TopologyPieces, TopologyPiecesBuilder}, |
14 | 18 | }, |
15 | 19 | }; |
16 | 20 |
|
| 21 | +/// Stub enrichment table used during transform validation so VRL can resolve |
| 22 | +/// table name references without loading actual table data. |
| 23 | +#[derive(Clone)] |
| 24 | +struct StubEnrichmentTable; |
| 25 | + |
| 26 | +impl vector_lib::enrichment::Table for StubEnrichmentTable { |
| 27 | + fn find_table_row<'a>( |
| 28 | + &self, |
| 29 | + _: Case, |
| 30 | + _: &'a [vector_lib::enrichment::Condition<'a>], |
| 31 | + _: Option<&[String]>, |
| 32 | + _: Option<&vrl::value::Value>, |
| 33 | + _: Option<IndexHandle>, |
| 34 | + ) -> Result<ObjectMap, vector_lib::enrichment::Error> { |
| 35 | + unreachable!("stub table is compile-time only") |
| 36 | + } |
| 37 | + |
| 38 | + fn find_table_rows<'a>( |
| 39 | + &self, |
| 40 | + _: Case, |
| 41 | + _: &'a [vector_lib::enrichment::Condition<'a>], |
| 42 | + _: Option<&[String]>, |
| 43 | + _: Option<&vrl::value::Value>, |
| 44 | + _: Option<IndexHandle>, |
| 45 | + ) -> Result<Vec<ObjectMap>, vector_lib::enrichment::Error> { |
| 46 | + unreachable!("stub table is compile-time only") |
| 47 | + } |
| 48 | + |
| 49 | + fn add_index( |
| 50 | + &mut self, |
| 51 | + _: Case, |
| 52 | + _: &[&str], |
| 53 | + ) -> Result<IndexHandle, vector_lib::enrichment::Error> { |
| 54 | + Ok(IndexHandle(0)) |
| 55 | + } |
| 56 | + |
| 57 | + fn index_fields(&self) -> Vec<(Case, Vec<String>)> { |
| 58 | + vec![] |
| 59 | + } |
| 60 | + |
| 61 | + fn needs_reload(&self) -> bool { |
| 62 | + false |
| 63 | + } |
| 64 | +} |
| 65 | + |
17 | 66 | const TEMPORARY_DIRECTORY: &str = "validate_tmp"; |
18 | 67 |
|
19 | 68 | #[derive(Parser, Debug)] |
@@ -117,6 +166,8 @@ pub async fn validate(opts: &Opts, color: bool) -> ExitCode { |
117 | 166 | None => return exitcode::CONFIG, |
118 | 167 | }; |
119 | 168 |
|
| 169 | + validated &= validate_transforms(&config, &mut fmt).await; |
| 170 | + |
120 | 171 | if !opts.no_environment { |
121 | 172 | if let Some(tmp_directory) = create_tmp_directory(&mut config, &mut fmt) { |
122 | 173 | validated &= validate_environment(opts, &config, &mut fmt).await; |
@@ -180,6 +231,74 @@ pub fn validate_config(opts: &Opts, fmt: &mut Formatter) -> Option<Config> { |
180 | 231 | Some(config) |
181 | 232 | } |
182 | 233 |
|
| 234 | +async fn validate_transforms(config: &Config, fmt: &mut Formatter) -> bool { |
| 235 | + let enrichment_tables = TableRegistry::default(); |
| 236 | + // Register stub tables so VRL can resolve configured enrichment table names |
| 237 | + // without loading actual data. This lets us catch real VRL errors (syntax, |
| 238 | + // type, wrong table name) while deferring data-loading to the environment phase. |
| 239 | + let stubs: HashMap<String, Box<dyn vector_lib::enrichment::Table + Send + Sync>> = config |
| 240 | + .enrichment_tables |
| 241 | + .keys() |
| 242 | + .map(|key| { |
| 243 | + ( |
| 244 | + key.to_string(), |
| 245 | + Box::new(StubEnrichmentTable) |
| 246 | + as Box<dyn vector_lib::enrichment::Table + Send + Sync>, |
| 247 | + ) |
| 248 | + }) |
| 249 | + .collect(); |
| 250 | + if !stubs.is_empty() { |
| 251 | + enrichment_tables.load(stubs); |
| 252 | + // Do not call finish_load(): table_ids() and add_index() (used during |
| 253 | + // VRL compilation) both operate on the loading stage. finish_load() |
| 254 | + // would move tables to the ArcSwap and make table_ids() return nothing. |
| 255 | + } |
| 256 | + let metrics_storage = MetricsStorage::default(); |
| 257 | + let mut definition_cache = HashMap::new(); |
| 258 | + let mut errors = Vec::new(); |
| 259 | + |
| 260 | + for (key, transform) in config.transforms() { |
| 261 | + let input_definitions = topology::schema::input_definitions( |
| 262 | + &transform.inputs, |
| 263 | + config, |
| 264 | + enrichment_tables.clone(), |
| 265 | + &mut definition_cache, |
| 266 | + ) |
| 267 | + .unwrap_or_default(); |
| 268 | + |
| 269 | + let merged_schema_definition = input_definitions |
| 270 | + .iter() |
| 271 | + .map(|(_, definition)| definition.clone()) |
| 272 | + .reduce(Definition::merge) |
| 273 | + .unwrap_or_else(Definition::any); |
| 274 | + |
| 275 | + let context = TransformContext { |
| 276 | + key: Some(key.clone()), |
| 277 | + globals: config.global.clone(), |
| 278 | + enrichment_tables: enrichment_tables.clone(), |
| 279 | + metrics_storage: metrics_storage.clone(), |
| 280 | + merged_schema_definition, |
| 281 | + schema: config.schema, |
| 282 | + ..Default::default() |
| 283 | + }; |
| 284 | + |
| 285 | + if let Err(errs) = transform.inner.validate(&context) { |
| 286 | + for err in errs { |
| 287 | + errors.push(format!("Transform \"{key}\": {err}")); |
| 288 | + } |
| 289 | + } |
| 290 | + } |
| 291 | + |
| 292 | + if errors.is_empty() { |
| 293 | + fmt.success("Transforms configuration"); |
| 294 | + true |
| 295 | + } else { |
| 296 | + fmt.title("Transform errors"); |
| 297 | + fmt.sub_error(errors); |
| 298 | + false |
| 299 | + } |
| 300 | +} |
| 301 | + |
183 | 302 | async fn validate_environment(opts: &Opts, config: &Config, fmt: &mut Formatter) -> bool { |
184 | 303 | let diff = ConfigDiff::initial(config); |
185 | 304 |
|
|
0 commit comments