Skip to content

Commit c34fc4c

Browse files
committed
fix: wire TopicResolver into KafkaTransport — auto-discover when topics empty
1 parent 146fa48 commit c34fc4c

2 files changed

Lines changed: 35 additions & 19 deletions

File tree

src/transport/kafka/mod.rs

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,13 +62,15 @@ pub use admin::{KafkaAdmin, TopicInfo};
6262
pub use config::{
6363
DEVTEST_PROFILE, HIGH_THROUGHPUT_CONSUMER_DEFAULTS, KafkaConfig, KafkaProfile,
6464
LOW_LATENCY_CONSUMER_DEFAULTS, PRODUCER_DEFAULTS, PRODUCER_DEVTEST, PRODUCER_EXACTLY_ONCE,
65-
PRODUCER_HIGH_THROUGHPUT, PRODUCER_LOW_LATENCY, PRODUCTION_PROFILE, merge_with_overrides,
65+
PRODUCER_HIGH_THROUGHPUT, PRODUCER_LOW_LATENCY, PRODUCTION_PROFILE, SuppressionRule,
66+
merge_with_overrides,
6667
};
6768
pub use metrics::{
6869
BrokerMetrics, KafkaMetrics, StatsContext, healthy_broker_count, total_consumer_lag,
6970
};
7071
pub use producer::{KafkaProducer, ProducerMetrics, ProducerProfile};
7172
pub use token::KafkaToken;
73+
pub use topic_resolver::{TopicRefreshHandle, TopicResolver};
7274

7375
use super::error::{TransportError, TransportResult};
7476
use super::traits::{TransportBase, TransportReceiver, TransportSender};
@@ -215,8 +217,23 @@ impl KafkaTransport {
215217
.create_with_context(StatsContext::new())
216218
.map_err(|e| TransportError::Connection(format!("Failed to create consumer: {e}")))?;
217219

220+
// Resolve effective topics: use explicit list or auto-discover from broker
221+
let effective_topics = if config.topics.is_empty() {
222+
tracing::info!("Topics empty — auto-discovering from broker");
223+
let resolver = topic_resolver::TopicResolver::new(config)?;
224+
let discovered = resolver.resolve()?;
225+
if discovered.is_empty() {
226+
return Err(TransportError::Config(
227+
"Auto-discovery found no matching topics".into(),
228+
));
229+
}
230+
discovered
231+
} else {
232+
config.topics.clone()
233+
};
234+
218235
// Subscribe to topics
219-
let subscribed_topics = config.topics.clone();
236+
let subscribed_topics = effective_topics;
220237
if !subscribed_topics.is_empty() {
221238
let topics: Vec<&str> = subscribed_topics.iter().map(String::as_str).collect();
222239
consumer

src/transport/kafka/topic_resolver.rs

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ impl std::fmt::Debug for TopicResolver {
146146
/// With the default rule (`_load` suppresses `_land`):
147147
/// - `auth_load` + `auth_land` → keeps `auth_load`, drops `auth_land`
148148
/// - `events_land` (no `events_load`) → kept
149+
#[must_use]
149150
pub fn apply_suppression_rules(topics: Vec<String>, rules: &[SuppressionRule]) -> Vec<String> {
150151
if rules.is_empty() {
151152
return topics;
@@ -161,16 +162,13 @@ pub fn apply_suppression_rules(topics: Vec<String>, rules: &[SuppressionRule]) -
161162
})
162163
.collect();
163164

164-
result = result
165-
.into_iter()
166-
.filter(|t| {
167-
if let Some(base) = t.strip_suffix(rule.suppressed_suffix.as_str()) {
168-
!preferred_bases.contains(base)
169-
} else {
170-
true
171-
}
172-
})
173-
.collect();
165+
result.retain(|t| {
166+
if let Some(base) = t.strip_suffix(rule.suppressed_suffix.as_str()) {
167+
!preferred_bases.contains(base)
168+
} else {
169+
true
170+
}
171+
});
174172
}
175173
result
176174
}
@@ -184,6 +182,7 @@ pub fn apply_suppression_rules(topics: Vec<String>, rules: &[SuppressionRule]) -
184182
/// - **Include**: if patterns exist, the topic MUST match at least one (OR).
185183
/// Empty include list means all topics are accepted.
186184
/// - **Exclude**: the topic MUST NOT match any pattern (OR). Exclude wins over include.
185+
#[must_use]
187186
pub fn passes_filters(topic: &str, include: &[Regex], exclude: &[Regex]) -> bool {
188187
if !include.is_empty() && !include.iter().any(|r| r.is_match(topic)) {
189188
return false;
@@ -229,12 +228,11 @@ impl TopicRefreshHandle {
229228
if self.rx.has_changed().unwrap_or(false) {
230229
self.rx.mark_changed();
231230
let current = self.rx.borrow().clone();
232-
if current != self.last_seen {
233-
self.last_seen = current.clone();
234-
Some(current)
235-
} else {
236-
None
231+
if current == self.last_seen {
232+
return None;
237233
}
234+
self.last_seen.clone_from(&current);
235+
Some(current)
238236
} else {
239237
None
240238
}
@@ -265,6 +263,7 @@ impl TopicResolver {
265263
/// so the initial topic list is taken from `resolve()` at construction time.
266264
///
267265
/// If the shutdown token is cancelled, the background task exits cleanly.
266+
#[must_use]
268267
pub fn start_refresh_loop(
269268
self,
270269
interval: std::time::Duration,
@@ -280,11 +279,11 @@ impl TopicResolver {
280279
loop {
281280
tokio::select! {
282281
biased;
283-
_ = shutdown.cancelled() => {
282+
() = shutdown.cancelled() => {
284283
tracing::debug!("Topic refresh loop shutting down");
285284
break;
286285
}
287-
_ = ticker.tick() => {
286+
_tick = ticker.tick() => {
288287
match self.resolve() {
289288
Ok(new_topics) => {
290289
if tx.send(new_topics).is_err() {

0 commit comments

Comments
 (0)