Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
de45e11
feat(kubernetes_events source): add k8s events src
elohmeier Nov 7, 2025
697f574
Update website/cue/reference/components/sources/generated/kubernetes_…
elohmeier Jan 6, 2026
a05df90
Update website/cue/reference/components/sources/generated/kubernetes_…
elohmeier Jan 6, 2026
f8b894f
Update website/cue/reference/components/sources/generated/kubernetes_…
elohmeier Jan 6, 2026
fb4e1dc
Update website/cue/reference/components/sources/kubernetes_events.cue
elohmeier Jan 6, 2026
8294269
Update website/cue/reference/components/sources/kubernetes_events.cue
elohmeier Jan 6, 2026
a13042a
Update website/cue/reference/components/sources/kubernetes_events.cue
elohmeier Jan 6, 2026
27b3ab5
Update website/cue/reference/components/sources/kubernetes_events.cue
elohmeier Jan 6, 2026
dff5b56
Update website/cue/reference/components/sources/kubernetes_events.cue
elohmeier Jan 6, 2026
eecc5ba
Update website/cue/reference/components/sources/kubernetes_events.cue
elohmeier Jan 6, 2026
1e04ba0
Update website/cue/reference/components/sources/kubernetes_events.cue
elohmeier Jan 6, 2026
91a0039
Merge branch 'master' into k8s-events-source
pront Mar 18, 2026
ccd0053
fix(kubernetes_events): prefer series timestamps
elohmeier Apr 24, 2026
9e58987
Add leader election to kubernetes events source
elohmeier Apr 24, 2026
7f096f7
fix(kubernetes_events): align source metadata
elohmeier Apr 24, 2026
6616b0c
Merge remote-tracking branch 'origin/master' into k8s-events-source
elohmeier Apr 24, 2026
447ed29
fix(kubernetes_events): protect leader renewal
elohmeier Apr 24, 2026
009b3de
fix(kubernetes_events): defer dedupe commits
elohmeier Apr 26, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,7 @@ sources-logs = [
"sources-internal_logs",
"sources-journald",
"sources-kafka",
"sources-kubernetes_events",
"sources-kubernetes_logs",
"sources-logstash",
"sources-mqtt",
Expand Down Expand Up @@ -705,6 +706,7 @@ sources-internal_metrics = []
sources-static_metrics = []
sources-journald = []
sources-kafka = ["dep:rdkafka"]
sources-kubernetes_events = ["kubernetes"]
sources-kubernetes_logs = ["vector-lib/file-source", "kubernetes", "transforms-reduce"]
sources-logstash = ["sources-utils-net-tcp", "tokio-util/net"]
sources-mongodb_metrics = ["dep:mongodb"]
Expand Down
3 changes: 3 additions & 0 deletions changelog.d/kubernetes_events_source.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Introduced a `kubernetes_events` source that streams Kubernetes Event objects through the API, with optional deduplication, enrichment helpers, and Lease-based leader election for replicated deployments.

authors: elohmeier
1 change: 1 addition & 0 deletions lib/k8s-e2e-tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ publish = false
license = "MPL-2.0"

[dependencies]
chrono.workspace = true
futures.workspace = true
k8s-openapi = { version = "0.27.0", default-features = false, features = ["v1_31"] }
k8s-test-framework = { version = "0.1", path = "../k8s-test-framework" }
Expand Down
306 changes: 306 additions & 0 deletions lib/k8s-e2e-tests/tests/vector-aggregator.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
#![allow(clippy::await_holding_lock)]

use std::process::Stdio;
use std::time::{Duration, Instant};

use chrono::{SecondsFormat, Utc};
use indoc::formatdoc;
use k8s_e2e_tests::*;
use k8s_test_framework::{lock, vector::Config as VectorConfig};
use tokio::io::AsyncWriteExt;

const KUBERNETES_EVENTS_LEADER_ELECTION_REASON: &str = "VectorLeaderElectionTest";

/// This test validates that vector can deploy with the default
/// aggregator settings.
Expand Down Expand Up @@ -93,3 +101,301 @@ async fn metrics_pipeline() -> Result<(), Box<dyn std::error::Error>> {
drop(vector);
Ok(())
}

/// This test validates that the kubernetes_events source emits events from only
/// one replica when Lease-based leader election is enabled, and continues after
/// the leader pod is removed.
#[tokio::test]
async fn kubernetes_events_leader_election() -> Result<(), Box<dyn std::error::Error>> {
init();

let _guard = lock();
let namespace = get_namespace();
let framework = make_framework();
let override_name = get_override_name(&namespace, "vector-events-leader");
let lease_name = format!("{override_name}-events");
let helm_values =
kubernetes_events_leader_election_values(&override_name, &namespace, &lease_name);
let rbac = kubernetes_events_leader_election_rbac(&override_name);

let vector = framework
.helm_chart(
&namespace,
"vector",
"vector",
"https://helm.vector.dev",
VectorConfig {
custom_helm_values: vec![&helm_values],
custom_resource: &rbac,
},
)
.await?;
framework
.wait_for_rollout(
&namespace,
&format!("deployment/{override_name}"),
vec!["--timeout=90s"],
)
.await?;

let first_event = format!("{override_name}-first");
apply_test_event(&namespace, &first_event).await?;
wait_for_event_count(&namespace, &override_name, &first_event, 1).await?;

let first_holder = wait_for_lease_holder(&namespace, &lease_name, None).await?;
kubectl(&["delete", "pod", "-n", &namespace, &first_holder]).await?;
framework
.wait_for_rollout(
&namespace,
&format!("deployment/{override_name}"),
vec!["--timeout=90s"],
)
.await?;
let _second_holder =
wait_for_lease_holder(&namespace, &lease_name, Some(&first_holder)).await?;

let second_event = format!("{override_name}-second");
apply_test_event(&namespace, &second_event).await?;
wait_for_event_count(&namespace, &override_name, &second_event, 1).await?;

drop(vector);
Ok(())
}

fn kubernetes_events_leader_election_values(
override_name: &str,
namespace: &str,
lease_name: &str,
) -> String {
formatdoc! {r#"
role: "Stateless-Aggregator"
fullnameOverride: "{override_name}"
replicas: 2
image:
pullPolicy: IfNotPresent
service:
enabled: false
serviceHeadless:
enabled: false
customConfig:
data_dir: /vector-data-dir
sources:
kubernetes_events:
type: kubernetes_events
namespaces: ["{namespace}"]
include_reasons: ["{KUBERNETES_EVENTS_LEADER_ELECTION_REASON}"]
leader_election:
enabled: true
lease_name: "{lease_name}"
lease_namespace: "{namespace}"
identity_env_var: HOSTNAME
lease_duration_seconds: 8
renew_deadline_seconds: 5
retry_period_seconds: 1
sinks:
stdout:
type: console
inputs: [kubernetes_events]
encoding:
codec: json
"#}
}

fn kubernetes_events_leader_election_rbac(override_name: &str) -> String {
formatdoc! {r#"
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: {override_name}-events
rules:
- apiGroups: ["events.k8s.io"]
resources: ["events"]
verbs: ["get", "list", "watch"]
- apiGroups: ["coordination.k8s.io"]
resources: ["leases"]
verbs: ["get", "create", "update"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: {override_name}-events
subjects:
- kind: ServiceAccount
name: {override_name}
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: Role
name: {override_name}-events
"#}
}

async fn apply_test_event(
namespace: &str,
event_name: &str,
) -> Result<(), Box<dyn std::error::Error>> {
let event_time = Utc::now().to_rfc3339_opts(SecondsFormat::Micros, true);
let manifest = formatdoc! {r#"
apiVersion: events.k8s.io/v1
kind: Event
metadata:
name: {event_name}
namespace: {namespace}
eventTime: "{event_time}"
action: Testing
reportingController: vector.dev/e2e
reportingInstance: {event_name}
reason: {KUBERNETES_EVENTS_LEADER_ELECTION_REASON}
regarding:
apiVersion: v1
kind: Pod
name: leader-election-test
namespace: {namespace}
note: "{event_name}"
type: Normal
"#};

kubectl_stdin(&["apply", "-f", "-"], manifest.as_bytes()).await
}

async fn wait_for_event_count(
namespace: &str,
pod_name_prefix: &str,
event_name: &str,
expected_count: usize,
) -> Result<(), Box<dyn std::error::Error>> {
let deadline = Instant::now() + Duration::from_secs(60);

loop {
let count = count_event_logs(namespace, pod_name_prefix, event_name).await?;
if count == expected_count {
return Ok(());
}
if count > expected_count {
return Err(format!(
"expected {expected_count} log line(s) for event {event_name}, found {count}"
)
.into());
}
if Instant::now() >= deadline {
return Err(format!(
"timed out waiting for {expected_count} log line(s) for event {event_name}, found {count}"
)
.into());
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
}

async fn count_event_logs(
namespace: &str,
pod_name_prefix: &str,
event_name: &str,
) -> Result<usize, Box<dyn std::error::Error>> {
let pods = pods_with_prefix(namespace, pod_name_prefix).await?;
let mut count = 0;
for pod in pods {
let output = kubectl(&["logs", "-n", namespace, &pod]).await?;
count += output
.lines()
.filter(|line| line.contains(event_name))
.count();
}
Ok(count)
}

async fn wait_for_lease_holder(
namespace: &str,
lease_name: &str,
previous_holder: Option<&str>,
) -> Result<String, Box<dyn std::error::Error>> {
let deadline = Instant::now() + Duration::from_secs(60);

loop {
let holder = kubectl(&[
"get",
"lease",
"-n",
namespace,
lease_name,
"-o",
"jsonpath={.spec.holderIdentity}",
])
.await?
.trim()
.to_string();

if !holder.is_empty() && previous_holder.is_none_or(|previous| previous != holder) {
return Ok(holder);
}
if Instant::now() >= deadline {
return Err(format!("timed out waiting for lease holder on {lease_name}").into());
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
}

async fn pods_with_prefix(
namespace: &str,
pod_name_prefix: &str,
) -> Result<Vec<String>, Box<dyn std::error::Error>> {
let output = kubectl(&["get", "pods", "-n", namespace, "-o", "json"]).await?;
let pods: serde_json::Value = serde_json::from_str(&output)?;
let pods = pods["items"]
.as_array()
.into_iter()
.flatten()
.filter_map(|pod| pod["metadata"]["name"].as_str())
.filter(|name| name.starts_with(pod_name_prefix))
.map(ToString::to_string)
.collect();

Ok(pods)
}

async fn kubectl(args: &[&str]) -> Result<String, Box<dyn std::error::Error>> {
let output = tokio::process::Command::new(kubectl_bin())
.args(args)
.output()
.await?;

if !output.status.success() {
return Err(format!(
"kubectl {} failed: {}",
args.join(" "),
String::from_utf8_lossy(&output.stderr)
)
.into());
}

Ok(String::from_utf8(output.stdout)?)
}

async fn kubectl_stdin(args: &[&str], stdin: &[u8]) -> Result<(), Box<dyn std::error::Error>> {
let mut child = tokio::process::Command::new(kubectl_bin())
.args(args)
.stdin(Stdio::piped())
.spawn()?;

child
.stdin
.take()
.expect("stdin should be piped")
.write_all(stdin)
.await?;

let output = child.wait_with_output().await?;
if !output.status.success() {
return Err(format!(
"kubectl {} failed: {}",
args.join(" "),
String::from_utf8_lossy(&output.stderr)
)
.into());
}

Ok(())
}

fn kubectl_bin() -> String {
std::env::var("VECTOR_TEST_KUBECTL").unwrap_or_else(|_| "kubectl".to_string())
}
12 changes: 8 additions & 4 deletions scripts/deploy-chart-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,16 @@ up() {
)
done

# Set a reasonable log level to avoid issues with internal logs
# overwriting console output.
split-container-image "$CONTAINER_IMAGE"
if [[ -z "${VECTOR_TEST_SKIP_LOG_ENV:-}" ]]; then
# Set a reasonable log level to avoid issues with internal logs
# overwriting console output.
HELM_VALUES+=(
--set "env[0].name=VECTOR_LOG"
--set "env[0].value=info"
)
fi
HELM_VALUES+=(
--set "env[0].name=VECTOR_LOG"
--set "env[0].value=info"
--set "image.repository=$CONTAINER_IMAGE_REPOSITORY"
--set "image.tag=$CONTAINER_IMAGE_TAG"
)
Expand Down
Loading
Loading