Skip to content

Commit 3f22bb0

Browse files
jubradclaude
andcommitted
kafka: add BOOTSTRAP BROKER and MATCHING broker rules for PrivateLink
Introduces two new SQL constructs for Kafka PrivateLink connections: - `BOOTSTRAP BROKER 'addr' USING AWS PRIVATELINK conn (...)` — provides the initial bootstrap address with an explicit PrivateLink tunnel. The bootstrap address is used as `bootstrap.servers` and the real hostname is preserved for correct TLS SNI. - `MATCHING 'pattern' USING AWS PRIVATELINK conn (...)` inside `BROKERS` — pattern-based routing rules for dynamically discovered brokers. After the initial metadata fetch, Kafka returns broker addresses that may differ from the bootstrap address (e.g., AZ-specific hostnames). MATCHING rules route these through the correct PrivateLink endpoint. This replaces the `AWS PRIVATELINKS` syntax which used exact-match patterns for dual-purpose bootstrap/routing and a separate `TO` keyword inconsistent with the existing `USING AWS PRIVATELINK` syntax. Example: ```sql CREATE CONNECTION kafka TO KAFKA ( BOOTSTRAP BROKER 'lkc-825730.endpoint.cloud:9092' USING AWS PRIVATELINK pl_conn (AVAILABILITY ZONE 'use1-az1'), BROKERS ( MATCHING '*use1-az1*' USING AWS PRIVATELINK pl_conn (AVAILABILITY ZONE 'use1-az1'), MATCHING '*use1-az4*' USING AWS PRIVATELINK pl_conn (AVAILABILITY ZONE 'use1-az4') ), SASL MECHANISMS 'PLAIN', SASL USERNAME 'key', SASL PASSWORD SECRET secret, SECURITY PROTOCOL 'SASL_SSL' ); ``` Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 99ca3ea commit 3f22bb0

14 files changed

Lines changed: 888 additions & 198 deletions

File tree

src/adapter/src/coord/ddl.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -822,10 +822,14 @@ impl Coordinator {
822822
}
823823

824824
pub(crate) fn drop_vpc_endpoints_in_background(&self, vpc_endpoints: Vec<CatalogItemId>) {
825-
let cloud_resource_controller = Arc::clone(self.cloud_resource_controller
826-
.as_ref()
827-
.ok_or(AdapterError::Unsupported("AWS PrivateLink connections"))
828-
.expect("vpc endpoints should only be dropped in CLOUD, where `cloud_resource_controller` is `Some`"));
825+
// Match the create path (catalog_implications.rs) which gracefully
826+
// logs an error when cloud_resource_controller is None, rather than
827+
// panicking.
828+
let Some(cloud_resource_controller) = self.cloud_resource_controller.as_ref() else {
829+
warn!("dropping VPC endpoints without cloud_resource_controller; skipping cleanup");
830+
return;
831+
};
832+
let cloud_resource_controller = Arc::clone(cloud_resource_controller);
829833
// We don't want to block the coordinator on an external delete api
830834
// calls, so move the drop vpc_endpoint to a separate task. This does
831835
// mean that a failed drop won't bubble up to the user as an error

src/kafka-util/src/client.rs

Lines changed: 173 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,13 @@ pub struct BrokerAddr {
288288
pub port: u16,
289289
}
290290

291+
impl BrokerAddr {
292+
/// Attempt to resolve this broker address into a list of socket addresses.
293+
pub fn to_socket_addrs(&self) -> Result<Vec<SocketAddr>, io::Error> {
294+
Ok((self.host.as_str(), self.port).to_socket_addrs()?.collect())
295+
}
296+
}
297+
291298
/// Rewrites a broker address.
292299
///
293300
/// For use with [`TunnelingClientContext`].
@@ -301,6 +308,16 @@ pub struct BrokerRewrite {
301308
pub port: Option<u16>,
302309
}
303310

311+
impl BrokerRewrite {
312+
/// Apply the rewrite to this broker address.
313+
pub fn rewrite(&self, address: &BrokerAddr) -> BrokerAddr {
314+
BrokerAddr {
315+
host: self.host.clone(),
316+
port: self.port.unwrap_or(address.port),
317+
}
318+
}
319+
}
320+
304321
#[derive(Clone)]
305322
enum BrokerRewriteHandle {
306323
Simple(BrokerRewrite),
@@ -313,6 +330,78 @@ enum BrokerRewriteHandle {
313330
FailedDefaultSshTunnel(String),
314331
}
315332

333+
#[derive(Clone)]
334+
/// Parsed from a string, with optional leading and trailing '*' wildcards.
335+
pub struct ConnectionRulePattern {
336+
/// If true, allow any combination of characters before the literal match.
337+
pub prefix_wildcard: bool,
338+
/// We expect the broker's host:port to match these characters in their entirety.
339+
pub literal_match: String,
340+
/// If true, allow any combination of characters after the literal match.
341+
pub suffix_wildcard: bool,
342+
}
343+
344+
impl std::fmt::Display for ConnectionRulePattern {
345+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
346+
if self.prefix_wildcard {
347+
f.write_str("*")?;
348+
}
349+
f.write_str(&self.literal_match)?;
350+
if self.suffix_wildcard {
351+
f.write_str("*")?;
352+
}
353+
Ok(())
354+
}
355+
}
356+
357+
impl ConnectionRulePattern {
358+
/// Does this "{host}:{port}" address fit the pattern?
359+
pub fn matches(&self, address: &str) -> bool {
360+
if self.prefix_wildcard {
361+
if self.suffix_wildcard {
362+
address.contains(&self.literal_match)
363+
} else {
364+
address.ends_with(&self.literal_match)
365+
}
366+
} else if self.suffix_wildcard {
367+
address.starts_with(&self.literal_match)
368+
} else {
369+
address == self.literal_match
370+
}
371+
}
372+
}
373+
374+
#[derive(Clone)]
375+
/// Given a host address, map it to a different host.
376+
pub struct HostMappingRules {
377+
/// Map matching hosts to a different host. First applicable rule wins.
378+
pub rules: Vec<(ConnectionRulePattern, BrokerRewrite)>,
379+
}
380+
381+
impl HostMappingRules {
382+
/// Rewrite this broker address according to the rules. Returns `None` when
383+
/// no rule matches.
384+
pub fn rewrite(&self, src: &BrokerAddr) -> Option<BrokerAddr> {
385+
let address = format!("{}:{}", src.host, src.port);
386+
for (pattern, dst) in &self.rules {
387+
if pattern.matches(&address) {
388+
let result = dst.rewrite(src);
389+
info!(
390+
"HostMappingRules: broker {}:{} matched pattern '{}' -> rewriting to {}:{}",
391+
src.host, src.port, pattern, result.host, result.port,
392+
);
393+
return Some(result);
394+
}
395+
}
396+
397+
warn!(
398+
"HostMappingRules: broker {}:{} matched no rules, using original address",
399+
src.host, src.port,
400+
);
401+
None
402+
}
403+
}
404+
316405
/// Tunneling clients
317406
/// used for re-writing ports / hosts
318407
#[derive(Clone)]
@@ -321,6 +410,8 @@ pub enum TunnelConfig {
321410
Ssh(SshTunnelConfig),
322411
/// Re-writes internal hosts using the value, used for privatelink
323412
StaticHost(String),
413+
/// Re-writes internal hosts according to an ordered list of rules, also used for privatelink
414+
Rules(HostMappingRules),
324415
/// Performs no re-writes
325416
None,
326417
}
@@ -489,7 +580,12 @@ where
489580
}
490581
}
491582

583+
/// Look up the broker's address in our book of rewrites.
584+
/// If we've already rewritten it before, reuse the existing rewrite.
585+
/// Otherwise, use our "default tunnel" rewriting strategy to attempt to rewrite this broker's address
586+
/// and record it in the book of rewrites.
492587
fn resolve_broker_addr(&self, host: &str, port: u16) -> Result<Vec<SocketAddr>, io::Error> {
588+
info!("kafka: resolve_broker_addr called for {}:{}", host, port);
493589
let return_rewrite = |rewrite: &BrokerRewriteHandle| -> Result<Vec<SocketAddr>, io::Error> {
494590
let rewrite = match rewrite {
495591
BrokerRewriteHandle::Simple(rewrite) => rewrite.clone(),
@@ -525,8 +621,12 @@ where
525621
let rewrite = self.rewrites.lock().expect("poisoned").get(&addr).cloned();
526622

527623
match rewrite {
624+
// No (successful) broker address rewrite exists yet.
528625
None | Some(BrokerRewriteHandle::FailedDefaultSshTunnel(_)) => {
626+
// "Default tunnel" is actually the configured rewriting strategy used for brokers we haven't already rewritten.
529627
match &self.default_tunnel {
628+
// This "default tunnel" is actually a default tunnel.
629+
// Try connecting so we have a valid rewrite for thsi broker address.
530630
TunnelConfig::Ssh(default_tunnel) => {
531631
// Multiple users could all run `connect` at the same time; only one ssh
532632
// tunnel will ever be connected, and only one will be inserted into the
@@ -543,6 +643,7 @@ where
543643
.await
544644
});
545645
match ssh_tunnel {
646+
// Use the tunnel we just created, but only if nobody beat us in the race.
546647
Ok(ssh_tunnel) => {
547648
let mut rewrites = self.rewrites.lock().expect("poisoned");
548649
let rewrite = match rewrites.entry(addr.clone()) {
@@ -565,6 +666,7 @@ where
565666

566667
return_rewrite(rewrite)
567668
}
669+
// We couldn't connect. Someone else will have to try again.
568670
Err(e) => {
569671
warn!(
570672
"failed to create ssh tunnel for {:?}: {}",
@@ -587,15 +689,45 @@ where
587689
}
588690
}
589691
}
692+
// Our rewrite strategy is to use a specific host, e.g. a PrivateLink endpoint.
590693
TunnelConfig::StaticHost(host) => (host.as_str(), port)
591694
.to_socket_addrs()
592695
.map(|addrs| addrs.collect()),
696+
// Rewrite according to the routing rules.
697+
TunnelConfig::Rules(rules) => {
698+
// If no rules match, just use the address as-is.
699+
let resolved = rules.rewrite(&addr).unwrap_or_else(|| addr.clone());
700+
match resolved.to_socket_addrs() {
701+
Ok(addrs) => {
702+
info!(
703+
"kafka: resolve_broker_addr {}:{} -> {}:{} resolved to {:?}",
704+
host, port, resolved.host, resolved.port, addrs,
705+
);
706+
Ok(addrs)
707+
}
708+
Err(e) => {
709+
warn!(
710+
"kafka: resolve_broker_addr {}:{} -> {}:{} DNS resolution FAILED: {e}",
711+
host, port, resolved.host, resolved.port,
712+
);
713+
Err(e)
714+
}
715+
}
716+
}
717+
// We leave the broker's address as it is.
593718
TunnelConfig::None => {
594719
(host, port).to_socket_addrs().map(|addrs| addrs.collect())
595720
}
596721
}
597722
}
598-
Some(rewrite) => return_rewrite(&rewrite),
723+
// This broker's address was already rewritten. Reuse the existing rewrite.
724+
Some(rewrite) => {
725+
info!(
726+
"kafka: resolve_broker_addr {}:{} using cached rewrite",
727+
host, port
728+
);
729+
return_rewrite(&rewrite)
730+
}
599731
}
600732
}
601733

@@ -958,3 +1090,43 @@ pub fn create_new_client_config(
9581090

9591091
config
9601092
}
1093+
1094+
#[cfg(test)]
1095+
mod tests {
1096+
use super::*;
1097+
1098+
#[mz_ore::test]
1099+
fn test_connection_rule_pattern_matches() {
1100+
let p = ConnectionRulePattern {
1101+
prefix_wildcard: false,
1102+
literal_match: "broker:9092".to_string(),
1103+
suffix_wildcard: false,
1104+
};
1105+
assert!(p.matches("broker:9092"));
1106+
assert!(!p.matches("other:9092"));
1107+
1108+
let p = ConnectionRulePattern {
1109+
prefix_wildcard: true,
1110+
literal_match: ":9092".to_string(),
1111+
suffix_wildcard: false,
1112+
};
1113+
assert!(p.matches("any-host:9092"));
1114+
assert!(!p.matches("broker:9093"));
1115+
1116+
let p = ConnectionRulePattern {
1117+
prefix_wildcard: false,
1118+
literal_match: "broker:".to_string(),
1119+
suffix_wildcard: true,
1120+
};
1121+
assert!(p.matches("broker:9092"));
1122+
assert!(!p.matches("other:9092"));
1123+
1124+
let p = ConnectionRulePattern {
1125+
prefix_wildcard: true,
1126+
literal_match: "broker".to_string(),
1127+
suffix_wildcard: true,
1128+
};
1129+
assert!(p.matches("my-broker-host:1234"));
1130+
assert!(!p.matches("other:9092"));
1131+
}
1132+
}

src/sql-lexer/src/keywords.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,7 @@ Managed
279279
Manual
280280
Map
281281
Marketing
282+
Matching
282283
Materialize
283284
Materialized
284285
Max

src/sql-parser/src/ast/defs/statement.rs

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -603,6 +603,68 @@ impl<T: AstInfo> AstDisplay for ConnectionDefaultAwsPrivatelink<T> {
603603
}
604604
impl_display_t!(ConnectionDefaultAwsPrivatelink);
605605

606+
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
607+
/// A MATCHING rule inside BROKERS (...) that routes brokers matching a pattern
608+
/// through an AWS PrivateLink tunnel.
609+
pub struct KafkaMatchingBrokerRule<T: AstInfo> {
610+
/// Given a broker's host:port, should we use this route?
611+
pub pattern: ConnectionRulePattern,
612+
/// Route to the broker through this PrivateLink connection.
613+
pub tunnel: KafkaBrokerAwsPrivatelink<T>,
614+
}
615+
616+
#[derive(
617+
Debug,
618+
Clone,
619+
PartialEq,
620+
Eq,
621+
Hash,
622+
PartialOrd,
623+
Ord,
624+
Serialize,
625+
Deserialize
626+
)]
627+
/// Parsed from a string, with optional leading and trailing '*' wildcards.
628+
pub struct ConnectionRulePattern {
629+
/// If true, allow any combination of characters before the literal match.
630+
pub prefix_wildcard: bool,
631+
/// We expect the broker's host:port to match these characters in their entirety.
632+
pub literal_match: String,
633+
/// If true, allow any combination of characters after the literal match.
634+
pub suffix_wildcard: bool,
635+
}
636+
637+
impl<T: AstInfo> AstDisplay for KafkaMatchingBrokerRule<T> {
638+
fn fmt<W>(&self, f: &mut AstFormatter<W>)
639+
where
640+
W: fmt::Write,
641+
{
642+
f.write_str("MATCHING ");
643+
f.write_node(&self.pattern);
644+
f.write_str(" ");
645+
f.write_node(&self.tunnel);
646+
}
647+
}
648+
impl_display_t!(KafkaMatchingBrokerRule);
649+
650+
impl AstDisplay for ConnectionRulePattern {
651+
fn fmt<W>(&self, f: &mut AstFormatter<W>)
652+
where
653+
W: fmt::Write,
654+
{
655+
f.write_str("'");
656+
if self.prefix_wildcard {
657+
f.write_str("*");
658+
}
659+
f.write_node(&display::escape_single_quote_string(&self.literal_match));
660+
if self.suffix_wildcard {
661+
f.write_str("*");
662+
}
663+
f.write_str("'");
664+
}
665+
}
666+
impl_display!(ConnectionRulePattern);
667+
606668
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
607669
pub struct KafkaBroker<T: AstInfo> {
608670
pub address: String,
@@ -4396,6 +4458,7 @@ pub enum WithOptionValue<T: AstInfo> {
43964458
ClusterReplicas(Vec<ReplicaDefinition<T>>),
43974459
ConnectionKafkaBroker(KafkaBroker<T>),
43984460
ConnectionAwsPrivatelink(ConnectionDefaultAwsPrivatelink<T>),
4461+
KafkaMatchingBrokerRule(KafkaMatchingBrokerRule<T>),
43994462
RetainHistoryFor(Value),
44004463
Refresh(RefreshOptionValue<T>),
44014464
ClusterScheduleOptionValue(ClusterScheduleOptionValue),
@@ -4426,6 +4489,7 @@ impl<T: AstInfo> AstDisplay for WithOptionValue<T> {
44264489
| WithOptionValue::UnresolvedItemName(_)
44274490
| WithOptionValue::Ident(_)
44284491
| WithOptionValue::ConnectionAwsPrivatelink(_)
4492+
| WithOptionValue::KafkaMatchingBrokerRule(_)
44294493
| WithOptionValue::ClusterReplicas(_)
44304494
| WithOptionValue::ClusterScheduleOptionValue(_)
44314495
| WithOptionValue::ClusterAlterStrategy(_)
@@ -4477,6 +4541,9 @@ impl<T: AstInfo> AstDisplay for WithOptionValue<T> {
44774541
WithOptionValue::ConnectionAwsPrivatelink(aws_privatelink) => {
44784542
f.write_node(aws_privatelink);
44794543
}
4544+
WithOptionValue::KafkaMatchingBrokerRule(rule) => {
4545+
f.write_node(rule);
4546+
}
44804547
WithOptionValue::ConnectionKafkaBroker(broker) => {
44814548
f.write_node(broker);
44824549
}

0 commit comments

Comments
 (0)