Skip to content

Commit 8b87dfe

Browse files
committed
fix(cli): validate transform expressions without environment
1 parent 2d0d0fb commit 8b87dfe

7 files changed

Lines changed: 246 additions & 32 deletions

File tree

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
Fixed `vector validate --no-environment` so it reports VRL and condition compilation errors for transforms without requiring full environment-dependent component initialization.
2+
3+
authors: pront

src/conditions/mod.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,14 @@ impl AnyCondition {
210210
AnyCondition::Map(m) => m.build(enrichment_tables, metrics_storage),
211211
}
212212
}
213+
214+
pub fn validate(
215+
&self,
216+
enrichment_tables: &vector_lib::enrichment::TableRegistry,
217+
metrics_storage: &MetricsStorage,
218+
) -> crate::Result<()> {
219+
self.build(enrichment_tables, metrics_storage).map(|_| ())
220+
}
213221
}
214222

215223
impl From<ConditionConfig> for AnyCondition {

src/config/transform.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,12 @@ pub struct TransformContext {
162162
/// (e.g. `aws_ec2_metadata`, `throttle`) clone this and pass it to [`crate::cpu_time::spawn_timed`] so
163163
/// their CPU is attributed to the component alongside the main transform task.
164164
pub cpu_ns: Option<Counter>,
165+
166+
/// Whether transform construction should skip checks that require the running environment.
167+
///
168+
/// This is set by `vector validate --no-environment` when it builds transforms to validate
169+
/// configuration that is normally checked in `build`.
170+
pub skip_environment_checks: bool,
165171
}
166172

167173
impl Default for TransformContext {
@@ -176,6 +182,7 @@ impl Default for TransformContext {
176182
schema: SchemaOptions::default(),
177183
extra_context: Default::default(),
178184
cpu_ns: None,
185+
skip_environment_checks: false,
179186
}
180187
}
181188
}

src/topology/builder.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -540,6 +540,7 @@ impl<'a> Builder<'a> {
540540
} else {
541541
None
542542
},
543+
skip_environment_checks: false,
543544
};
544545

545546
let node = TransformNode::from_parts(key.clone(), &context, transform, &input_definitions);

src/transforms/aws_ec2_metadata.rs

Lines changed: 51 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -229,27 +229,29 @@ impl TransformConfig for Ec2Metadata {
229229
tags,
230230
);
231231

232-
// If initial metadata is not required, log and proceed. Otherwise return error.
233-
if let Err(error) = client.refresh_metadata().await {
234-
if required {
235-
return Err(error);
236-
} else {
237-
emit!(AwsEc2MetadataRefreshError { error });
232+
if !context.skip_environment_checks {
233+
// If initial metadata is not required, log and proceed. Otherwise return error.
234+
if let Err(error) = client.refresh_metadata().await {
235+
if required {
236+
return Err(error);
237+
} else {
238+
emit!(AwsEc2MetadataRefreshError { error });
239+
}
238240
}
239-
}
240241

241-
// The metadata-refresh loop runs as its own tokio task, so the main
242-
// transform task's CPU-time wrapper does not see it. Spawn the
243-
// background task with the same component-tagged counter so its CPU
244-
// is attributed to this transform.
245-
spawn_timed(
246-
async move {
247-
client.run().await;
248-
}
249-
// TODO: Once #1338 is done we can fetch the current span
250-
.instrument(info_span!("aws_ec2_metadata: worker").or_current()),
251-
context.cpu_ns.clone(),
252-
);
242+
// The metadata-refresh loop runs as its own tokio task, so the main
243+
// transform task's CPU-time wrapper does not see it. Spawn the
244+
// background task with the same component-tagged counter so its CPU
245+
// is attributed to this transform.
246+
spawn_timed(
247+
async move {
248+
client.run().await;
249+
}
250+
// TODO: Once #1338 is done we can fetch the current span
251+
.instrument(info_span!("aws_ec2_metadata: worker").or_current()),
252+
context.cpu_ns.clone(),
253+
);
254+
}
253255

254256
Ok(Transform::event_task(Ec2MetadataTransform { state }))
255257
}
@@ -718,11 +720,15 @@ enum Ec2MetadataError {
718720

719721
#[cfg(test)]
720722
mod test {
723+
use tokio::{
724+
net::TcpListener,
725+
time::{Duration, timeout},
726+
};
721727
use vector_lib::lookup::OwnedTargetPath;
722728
use vrl::{owned_value_path, value::Kind};
723729

724730
use crate::{
725-
config::{LogNamespace, OutputId, TransformConfig, schema::Definition},
731+
config::{LogNamespace, OutputId, TransformConfig, TransformContext, schema::Definition},
726732
transforms::aws_ec2_metadata::Ec2Metadata,
727733
};
728734

@@ -745,6 +751,31 @@ mod test {
745751
let actual_schema_def = output.schema_definitions(true)[&OutputId::dummy()].clone();
746752
assert!(actual_schema_def.event_kind().is_object());
747753
}
754+
755+
#[tokio::test]
756+
async fn build_with_skip_environment_checks_does_not_refresh_metadata() {
757+
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
758+
let addr = listener.local_addr().unwrap();
759+
760+
let transform_config = Ec2Metadata {
761+
endpoint: format!("http://{addr}"),
762+
refresh_timeout_secs: Duration::from_millis(50),
763+
..Default::default()
764+
};
765+
let context = TransformContext {
766+
skip_environment_checks: true,
767+
..Default::default()
768+
};
769+
770+
let _transform = transform_config.build(&context).await.unwrap();
771+
772+
assert!(
773+
timeout(Duration::from_millis(100), listener.accept())
774+
.await
775+
.is_err(),
776+
"metadata endpoint was accessed while environment checks were skipped"
777+
);
778+
}
748779
}
749780

750781
#[cfg(feature = "aws-ec2-metadata-integration-tests")]

src/validate.rs

Lines changed: 77 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
#![allow(missing_docs)]
22

3-
use std::{fmt, fs::remove_dir_all, path::PathBuf};
3+
use std::{collections::HashMap, fmt, fs::remove_dir_all, path::PathBuf};
44

55
use clap::Parser;
66
use colored::*;
77
use exitcode::ExitCode;
8+
use vector_vrl_metrics::MetricsStorage;
89

910
use crate::{
10-
config::{self, Config, ConfigDiff, loading::ConfigBuilderLoader},
11+
config::{self, Config, ConfigDiff, TransformContext, loading::ConfigBuilderLoader},
12+
schema::Definition,
1113
topology::{
1214
self,
1315
builder::{TopologyPieces, TopologyPiecesBuilder},
@@ -117,13 +119,13 @@ pub async fn validate(opts: &Opts, color: bool) -> ExitCode {
117119
None => return exitcode::CONFIG,
118120
};
119121

120-
if !opts.no_environment {
121-
if let Some(tmp_directory) = create_tmp_directory(&mut config, &mut fmt) {
122-
validated &= validate_environment(opts, &config, &mut fmt).await;
123-
remove_tmp_directory(tmp_directory);
124-
} else {
125-
validated = false;
126-
}
122+
if opts.no_environment {
123+
validated &= validate_transforms_no_environment(&config, &mut fmt).await;
124+
} else if let Some(tmp_directory) = create_tmp_directory(&mut config, &mut fmt) {
125+
validated &= validate_environment(opts, &config, &mut fmt).await;
126+
remove_tmp_directory(tmp_directory);
127+
} else {
128+
validated = false;
127129
}
128130

129131
if validated {
@@ -180,6 +182,72 @@ pub fn validate_config(opts: &Opts, fmt: &mut Formatter) -> Option<Config> {
180182
Some(config)
181183
}
182184

185+
async fn validate_transforms_no_environment(config: &Config, fmt: &mut Formatter) -> bool {
186+
let enrichment_tables = vector_lib::enrichment::TableRegistry::default();
187+
let metrics_storage = MetricsStorage::default();
188+
let mut definition_cache = HashMap::new();
189+
let mut errors = Vec::new();
190+
191+
for (key, transform) in config.transforms() {
192+
let input_definitions = topology::schema::input_definitions(
193+
&transform.inputs,
194+
config,
195+
enrichment_tables.clone(),
196+
&mut definition_cache,
197+
)
198+
.unwrap_or_default();
199+
200+
let merged_schema_definition = input_definitions
201+
.iter()
202+
.map(|(_, definition)| definition.clone())
203+
.reduce(Definition::merge)
204+
.unwrap_or_else(Definition::any);
205+
206+
let schema_definitions = transform
207+
.inner
208+
.outputs(
209+
&TransformContext {
210+
enrichment_tables: enrichment_tables.clone(),
211+
metrics_storage: metrics_storage.clone(),
212+
schema: config.schema,
213+
..Default::default()
214+
},
215+
&input_definitions,
216+
)
217+
.into_iter()
218+
.map(|output| {
219+
let definitions = output.schema_definitions(config.schema.enabled);
220+
(output.port, definitions)
221+
})
222+
.collect();
223+
224+
let context = TransformContext {
225+
key: Some(key.clone()),
226+
globals: config.global.clone(),
227+
enrichment_tables: enrichment_tables.clone(),
228+
metrics_storage: metrics_storage.clone(),
229+
schema_definitions,
230+
merged_schema_definition,
231+
schema: config.schema,
232+
skip_environment_checks: true,
233+
..Default::default()
234+
};
235+
236+
if let Err(error) = transform.inner.build(&context).await {
237+
errors.push(format!("Transform \"{key}\": {error}"));
238+
}
239+
}
240+
241+
if errors.is_empty() {
242+
fmt.success("Transform configuration");
243+
true
244+
} else {
245+
fmt.title("Transform errors");
246+
fmt.sub_error(errors);
247+
false
248+
}
249+
}
250+
183251
async fn validate_environment(opts: &Opts, config: &Config, fmt: &mut Formatter) -> bool {
184252
let diff = ConfigDiff::initial(config);
185253

tests/integration/cli.rs

Lines changed: 99 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,87 @@ fn validate_ignore_healthcheck() {
132132
);
133133
}
134134

135+
#[test]
136+
fn validate_no_environment_reports_transform_vrl_errors() {
137+
assert_eq!(
138+
validate_with_args(
139+
indoc! {r#"
140+
data_dir = "${VECTOR_DATA_DIR}"
141+
142+
[sources.in]
143+
type = "demo_logs"
144+
format = "shuffle"
145+
lines = ["log"]
146+
147+
[transforms.broken]
148+
inputs = ["in"]
149+
type = "remap"
150+
source = ".foo = to_int(.bar)"
151+
152+
[sinks.out]
153+
inputs = ["broken"]
154+
type = "blackhole"
155+
"#},
156+
&["--no-environment"],
157+
),
158+
exitcode::CONFIG
159+
);
160+
}
161+
162+
#[test]
163+
fn validate_no_environment_validates_condition_transforms() {
164+
assert_eq!(
165+
validate_with_args(
166+
indoc! {r#"
167+
data_dir = "${VECTOR_DATA_DIR}"
168+
169+
[sources.in]
170+
type = "demo_logs"
171+
format = "shuffle"
172+
lines = ["log"]
173+
174+
[transforms.filtered]
175+
inputs = ["in"]
176+
type = "filter"
177+
condition = "exists(.message)"
178+
179+
[sinks.out]
180+
inputs = ["filtered"]
181+
type = "blackhole"
182+
"#},
183+
&["--no-environment"],
184+
),
185+
exitcode::OK
186+
);
187+
}
188+
189+
#[test]
190+
fn validate_no_environment_skips_aws_ec2_metadata_environment_check() {
191+
assert_eq!(
192+
validate_with_args(
193+
indoc! {r#"
194+
data_dir = "${VECTOR_DATA_DIR}"
195+
196+
[sources.in]
197+
type = "demo_logs"
198+
format = "shuffle"
199+
lines = ["log"]
200+
201+
[transforms.meta]
202+
inputs = ["in"]
203+
type = "aws_ec2_metadata"
204+
endpoint = "http://127.0.0.1:9"
205+
206+
[sinks.out]
207+
inputs = ["meta"]
208+
type = "blackhole"
209+
"#},
210+
&["--no-environment"],
211+
),
212+
exitcode::OK
213+
);
214+
}
215+
135216
#[test]
136217
fn test_command_no_escape_codes_in_output() {
137218
// A config with an unhandled fallible VRL function call (missing `!`).
@@ -183,19 +264,34 @@ fn test_command_no_escape_codes_in_output() {
183264
}
184265

185266
fn validate(config: &str) -> i32 {
267+
validate_with_args(config, &[])
268+
}
269+
270+
fn validate_with_args(config: &str, args: &[&str]) -> i32 {
271+
validate_output_with_args(config, args)
272+
.status
273+
.code()
274+
.unwrap()
275+
}
276+
277+
fn validate_output_with_args(config: &str, args: &[&str]) -> std::process::Output {
186278
let dir = create_directory();
187279

188280
// Config with some components that write to file system.
189281
let config = create_file(config);
190282

191283
// Run vector
192284
let mut cmd = Command::cargo_bin("vector").unwrap();
193-
cmd.arg("validate").arg(config).env("VECTOR_DATA_DIR", dir);
285+
cmd.arg("validate");
286+
for arg in args {
287+
cmd.arg(arg);
288+
}
289+
cmd.arg(config).env("VECTOR_DATA_DIR", dir);
194290

195291
let output = cmd.output().unwrap();
196292
println!(
197293
"{}",
198-
String::from_utf8(output.stdout).expect("Vector output isn't a valid utf8 string")
294+
String::from_utf8(output.stdout.clone()).expect("Vector output isn't a valid utf8 string")
199295
);
200-
output.status.code().unwrap()
296+
output
201297
}

0 commit comments

Comments
 (0)