Skip to content

Commit 3e5305e

Browse files
author
kazmosahebi
committed
fix: wire up kafka topic refresh loop and make auto-discovery opt-in
- Connect the existing TopicRefreshHandle to KafkaTransport so periodic topic re-resolution actually runs (was implemented but never wired up) - Add auto_discover field to KafkaConfig (default false) so producers with empty topics don't trigger unwanted broker auto-discovery - Default topic_exclude to ["^__"] to filter Kafka internal topics like __consumer_offsets from auto-discovery results - Demote worker pool scaling log from info to debug to reduce log spam
1 parent 11a63da commit 3e5305e

3 files changed

Lines changed: 83 additions & 18 deletions

File tree

src/transport/kafka/config.rs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -349,14 +349,22 @@ pub struct KafkaConfig {
349349
#[serde(default)]
350350
pub topics: Vec<String>,
351351

352+
/// Enable auto-discovery when `topics` is empty.
353+
/// When false (default), empty `topics` means no subscription (producer-only).
354+
/// When true, empty `topics` triggers broker auto-discovery with
355+
/// `topic_include`/`topic_exclude` filters and suppression rules.
356+
#[serde(default)]
357+
pub auto_discover: bool,
358+
352359
/// Regex patterns for topic include filtering (empty = accept all).
353360
/// Topics must match at least one pattern (OR logic).
354361
#[serde(default)]
355362
pub topic_include: Vec<String>,
356363

357364
/// Regex patterns for topic exclude filtering.
358365
/// Topics matching any pattern are excluded. Exclude wins over include.
359-
#[serde(default)]
366+
/// Default: `["^__"]` (excludes Kafka internal topics like `__consumer_offsets`).
367+
#[serde(default = "default_topic_exclude")]
360368
pub topic_exclude: Vec<String>,
361369

362370
/// Periodic topic refresh interval in seconds (0 = disabled).
@@ -466,6 +474,10 @@ pub struct KafkaConfig {
466474
pub extra_config: HashMap<String, String>,
467475
}
468476

477+
fn default_topic_exclude() -> Vec<String> {
478+
vec!["^__".to_string()]
479+
}
480+
469481
fn default_topic_refresh_secs() -> u64 {
470482
60
471483
}
@@ -534,8 +546,9 @@ impl Default for KafkaConfig {
534546
group: default_group(),
535547
client_id: default_client_id(),
536548
topics: Vec::new(),
549+
auto_discover: false,
537550
topic_include: Vec::new(),
538-
topic_exclude: Vec::new(),
551+
topic_exclude: default_topic_exclude(),
539552
topic_refresh_secs: default_topic_refresh_secs(),
540553
topic_suppression_rules: default_topic_suppression_rules(),
541554
security_protocol: default_security_protocol(),
@@ -935,7 +948,8 @@ mod tests {
935948
fn kafka_config_topic_resolution_defaults() {
936949
let config = KafkaConfig::default();
937950
assert!(config.topic_include.is_empty());
938-
assert!(config.topic_exclude.is_empty());
951+
assert_eq!(config.topic_exclude, vec!["^__".to_string()]);
952+
assert!(!config.auto_discover);
939953
assert_eq!(config.topic_refresh_secs, 60);
940954
assert_eq!(config.topic_suppression_rules.len(), 1);
941955
assert_eq!(config.topic_suppression_rules[0].preferred_suffix, "_load");

src/transport/kafka/mod.rs

Lines changed: 65 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,11 @@ pub struct KafkaTransport {
122122
healthy: Arc<AtomicBool>,
123123
/// Topics we're subscribed to (for cache warming).
124124
subscribed_topics: Vec<String>,
125+
/// Shutdown token — cancelled on close() to stop background tasks.
126+
shutdown_token: tokio_util::sync::CancellationToken,
127+
/// Periodic topic refresh handle (auto-discovery mode only).
128+
/// Checked on each recv() call to detect new/removed topics.
129+
topic_refresh: Option<std::sync::Mutex<TopicRefreshHandle>>,
125130
}
126131

127132
impl KafkaTransport {
@@ -217,20 +222,45 @@ impl KafkaTransport {
217222
.create_with_context(StatsContext::new())
218223
.map_err(|e| TransportError::Connection(format!("Failed to create consumer: {e}")))?;
219224

220-
// Resolve effective topics: use explicit list or auto-discover from broker
221-
let effective_topics = if config.topics.is_empty() {
222-
tracing::info!("Topics empty — auto-discovering from broker");
223-
let resolver = topic_resolver::TopicResolver::new(config)?;
224-
let discovered = resolver.resolve()?;
225-
if discovered.is_empty() {
226-
return Err(TransportError::Config(
227-
"Auto-discovery found no matching topics".into(),
228-
));
229-
}
230-
discovered
231-
} else {
232-
config.topics.clone()
233-
};
225+
// Resolve effective topics:
226+
// - Explicit list → subscribe to those
227+
// - Empty + auto_discover → auto-discover from broker
228+
// - Empty + !auto_discover → no subscription (producer-only)
229+
let (effective_topics, topic_refresh, shutdown_token) =
230+
if config.topics.is_empty() && config.auto_discover {
231+
tracing::info!("Topics empty — auto-discovering from broker");
232+
let resolver = topic_resolver::TopicResolver::new(config)?;
233+
let discovered = resolver.resolve()?;
234+
if discovered.is_empty() {
235+
return Err(TransportError::Config(
236+
"Auto-discovery found no matching topics".into(),
237+
));
238+
}
239+
240+
let token = tokio_util::sync::CancellationToken::new();
241+
let refresh = if config.topic_refresh_secs > 0 {
242+
let refresh_resolver = topic_resolver::TopicResolver::new(config)?;
243+
let handle = refresh_resolver.start_refresh_loop(
244+
Duration::from_secs(config.topic_refresh_secs),
245+
token.clone(),
246+
);
247+
tracing::info!(
248+
interval_secs = config.topic_refresh_secs,
249+
"Started periodic topic refresh"
250+
);
251+
Some(std::sync::Mutex::new(handle))
252+
} else {
253+
None
254+
};
255+
256+
(discovered, refresh, token)
257+
} else {
258+
(
259+
config.topics.clone(),
260+
None,
261+
tokio_util::sync::CancellationToken::new(),
262+
)
263+
};
234264

235265
// Subscribe to topics
236266
let subscribed_topics = effective_topics;
@@ -273,6 +303,8 @@ impl KafkaTransport {
273303
closed: AtomicBool::new(false),
274304
healthy,
275305
subscribed_topics,
306+
shutdown_token,
307+
topic_refresh,
276308
})
277309
}
278310

@@ -290,6 +322,7 @@ impl TransportBase for KafkaTransport {
290322
async fn close(&self) -> TransportResult<()> {
291323
self.closed.store(true, Ordering::Relaxed);
292324
self.healthy.store(false, Ordering::Relaxed);
325+
self.shutdown_token.cancel();
293326
// rdkafka handles cleanup on drop
294327
Ok(())
295328
}
@@ -386,6 +419,23 @@ impl TransportReceiver for KafkaTransport {
386419
return Err(TransportError::Closed);
387420
}
388421

422+
// Check for topic changes from the background refresh loop
423+
if let Some(ref refresh) = self.topic_refresh {
424+
if let Ok(mut handle) = refresh.lock() {
425+
if let Some(new_topics) = handle.check_changed() {
426+
let topics: Vec<&str> = new_topics.iter().map(String::as_str).collect();
427+
match self.consumer.subscribe(&topics) {
428+
Ok(()) => {
429+
tracing::info!(?new_topics, "Re-subscribed after topic refresh");
430+
}
431+
Err(e) => {
432+
tracing::warn!(error = %e, "Failed to re-subscribe after topic refresh");
433+
}
434+
}
435+
}
436+
}
437+
}
438+
389439
let timeout = Duration::from_millis(tuning::POLL_TIMEOUT_MS);
390440
let max_msgs = max;
391441

@@ -542,6 +592,7 @@ impl std::fmt::Debug for KafkaTransport {
542592
.field("subscribed_topics", &self.subscribed_topics)
543593
.field("closed", &self.closed.load(Ordering::Relaxed))
544594
.field("healthy", &self.healthy.load(Ordering::Relaxed))
595+
.field("topic_refresh_active", &self.topic_refresh.is_some())
545596
.finish_non_exhaustive()
546597
}
547598
}

src/worker/scaler.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ impl ScalingController {
157157
"Worker pool steady"
158158
);
159159
} else {
160-
tracing::info!(
160+
tracing::debug!(
161161
cpu = format!("{cpu_util:.2}"),
162162
mem = format!("{effective_memory_pressure:.2}"),
163163
current = current_permits,

0 commit comments

Comments
 (0)