Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions src/adapter/src/coord/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -822,10 +822,14 @@ impl Coordinator {
}

pub(crate) fn drop_vpc_endpoints_in_background(&self, vpc_endpoints: Vec<CatalogItemId>) {
let cloud_resource_controller = Arc::clone(self.cloud_resource_controller
.as_ref()
.ok_or(AdapterError::Unsupported("AWS PrivateLink connections"))
.expect("vpc endpoints should only be dropped in CLOUD, where `cloud_resource_controller` is `Some`"));
// Match the create path (catalog_implications.rs) which gracefully
// logs an error when cloud_resource_controller is None, rather than
// panicking.
let Some(cloud_resource_controller) = self.cloud_resource_controller.as_ref() else {
warn!("dropping VPC endpoints without cloud_resource_controller; skipping cleanup");
return;
};
let cloud_resource_controller = Arc::clone(cloud_resource_controller);
// We don't want to block the coordinator on an external delete api
// calls, so move the drop vpc_endpoint to a separate task. This does
// mean that a failed drop won't bubble up to the user as an error
Expand Down
174 changes: 173 additions & 1 deletion src/kafka-util/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,13 @@ pub struct BrokerAddr {
pub port: u16,
}

impl BrokerAddr {
/// Attempt to resolve this broker address into a list of socket addresses.
pub fn to_socket_addrs(&self) -> Result<Vec<SocketAddr>, io::Error> {
Ok((self.host.as_str(), self.port).to_socket_addrs()?.collect())
}
}

/// Rewrites a broker address.
///
/// For use with [`TunnelingClientContext`].
Expand All @@ -301,6 +308,16 @@ pub struct BrokerRewrite {
pub port: Option<u16>,
}

impl BrokerRewrite {
/// Apply the rewrite to this broker address.
pub fn rewrite(&self, address: &BrokerAddr) -> BrokerAddr {
BrokerAddr {
host: self.host.clone(),
port: self.port.unwrap_or(address.port),
}
}
}

#[derive(Clone)]
enum BrokerRewriteHandle {
Simple(BrokerRewrite),
Expand All @@ -313,6 +330,78 @@ enum BrokerRewriteHandle {
FailedDefaultSshTunnel(String),
}

#[derive(Clone)]
/// Parsed from a string, with optional leading and trailing '*' wildcards.
pub struct ConnectionRulePattern {
/// If true, allow any combination of characters before the literal match.
pub prefix_wildcard: bool,
/// We expect the broker's host:port to match these characters in their entirety.
pub literal_match: String,
/// If true, allow any combination of characters after the literal match.
pub suffix_wildcard: bool,
Comment on lines +340 to +341
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we actually need a suffix, if this feature is restricted to confluent, confluent provides a suffix, so we only need prefix wildcard matching. I don't think this hurts, but it's not strictly necessary.

}

impl std::fmt::Display for ConnectionRulePattern {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if self.prefix_wildcard {
f.write_str("*")?;
}
f.write_str(&self.literal_match)?;
if self.suffix_wildcard {
f.write_str("*")?;
}
Ok(())
}
}

impl ConnectionRulePattern {
/// Does this "{host}:{port}" address fit the pattern?
pub fn matches(&self, address: &str) -> bool {
if self.prefix_wildcard {
if self.suffix_wildcard {
address.contains(&self.literal_match)
} else {
address.ends_with(&self.literal_match)
}
} else if self.suffix_wildcard {
address.starts_with(&self.literal_match)
} else {
address == self.literal_match
}
}
}

#[derive(Clone)]
/// Given a host address, map it to a different host.
pub struct HostMappingRules {
/// Map matching hosts to a different host. First applicable rule wins.
pub rules: Vec<(ConnectionRulePattern, BrokerRewrite)>,
}

impl HostMappingRules {
/// Rewrite this broker address according to the rules. Returns `None` when
/// no rule matches.
pub fn rewrite(&self, src: &BrokerAddr) -> Option<BrokerAddr> {
let address = format!("{}:{}", src.host, src.port);
for (pattern, dst) in &self.rules {
if pattern.matches(&address) {
let result = dst.rewrite(src);
info!(
"HostMappingRules: broker {}:{} matched pattern '{}' -> rewriting to {}:{}",
src.host, src.port, pattern, result.host, result.port,
);
return Some(result);
}
}

warn!(
"HostMappingRules: broker {}:{} matched no rules, using original address",
src.host, src.port,
);
None
}
}

/// Tunneling clients
/// used for re-writing ports / hosts
#[derive(Clone)]
Expand All @@ -321,6 +410,8 @@ pub enum TunnelConfig {
Ssh(SshTunnelConfig),
/// Re-writes internal hosts using the value, used for privatelink
StaticHost(String),
/// Re-writes internal hosts according to an ordered list of rules, also used for privatelink
Rules(HostMappingRules),
/// Performs no re-writes
None,
}
Expand Down Expand Up @@ -489,7 +580,12 @@ where
}
}

/// Look up the broker's address in our book of rewrites.
/// If we've already rewritten it before, reuse the existing rewrite.
/// Otherwise, use our "default tunnel" rewriting strategy to attempt to rewrite this broker's address
/// and record it in the book of rewrites.
fn resolve_broker_addr(&self, host: &str, port: u16) -> Result<Vec<SocketAddr>, io::Error> {
info!("kafka: resolve_broker_addr called for {}:{}", host, port);
let return_rewrite = |rewrite: &BrokerRewriteHandle| -> Result<Vec<SocketAddr>, io::Error> {
let rewrite = match rewrite {
BrokerRewriteHandle::Simple(rewrite) => rewrite.clone(),
Expand Down Expand Up @@ -525,8 +621,12 @@ where
let rewrite = self.rewrites.lock().expect("poisoned").get(&addr).cloned();

match rewrite {
// No (successful) broker address rewrite exists yet.
None | Some(BrokerRewriteHandle::FailedDefaultSshTunnel(_)) => {
// "Default tunnel" is actually the configured rewriting strategy used for brokers we haven't already rewritten.
match &self.default_tunnel {
// This "default tunnel" is actually a default tunnel.
// Try connecting so we have a valid rewrite for thsi broker address.
TunnelConfig::Ssh(default_tunnel) => {
// Multiple users could all run `connect` at the same time; only one ssh
// tunnel will ever be connected, and only one will be inserted into the
Expand All @@ -543,6 +643,7 @@ where
.await
});
match ssh_tunnel {
// Use the tunnel we just created, but only if nobody beat us in the race.
Ok(ssh_tunnel) => {
let mut rewrites = self.rewrites.lock().expect("poisoned");
let rewrite = match rewrites.entry(addr.clone()) {
Expand All @@ -565,6 +666,7 @@ where

return_rewrite(rewrite)
}
// We couldn't connect. Someone else will have to try again.
Err(e) => {
warn!(
"failed to create ssh tunnel for {:?}: {}",
Expand All @@ -587,15 +689,45 @@ where
}
}
}
// Our rewrite strategy is to use a specific host, e.g. a PrivateLink endpoint.
TunnelConfig::StaticHost(host) => (host.as_str(), port)
.to_socket_addrs()
.map(|addrs| addrs.collect()),
// Rewrite according to the routing rules.
TunnelConfig::Rules(rules) => {
// If no rules match, just use the address as-is.
let resolved = rules.rewrite(&addr).unwrap_or_else(|| addr.clone());
match resolved.to_socket_addrs() {
Ok(addrs) => {
info!(
"kafka: resolve_broker_addr {}:{} -> {}:{} resolved to {:?}",
host, port, resolved.host, resolved.port, addrs,
);
Ok(addrs)
}
Err(e) => {
warn!(
"kafka: resolve_broker_addr {}:{} -> {}:{} DNS resolution FAILED: {e}",
host, port, resolved.host, resolved.port,
);
Err(e)
}
}
}
// We leave the broker's address as it is.
TunnelConfig::None => {
(host, port).to_socket_addrs().map(|addrs| addrs.collect())
}
}
}
Some(rewrite) => return_rewrite(&rewrite),
// This broker's address was already rewritten. Reuse the existing rewrite.
Some(rewrite) => {
info!(
"kafka: resolve_broker_addr {}:{} using cached rewrite",
host, port
);
return_rewrite(&rewrite)
}
}
}

Expand Down Expand Up @@ -958,3 +1090,43 @@ pub fn create_new_client_config(

config
}

#[cfg(test)]
mod tests {
use super::*;

#[mz_ore::test]
fn test_connection_rule_pattern_matches() {
let p = ConnectionRulePattern {
prefix_wildcard: false,
literal_match: "broker:9092".to_string(),
suffix_wildcard: false,
};
assert!(p.matches("broker:9092"));
assert!(!p.matches("other:9092"));

let p = ConnectionRulePattern {
prefix_wildcard: true,
literal_match: ":9092".to_string(),
suffix_wildcard: false,
};
assert!(p.matches("any-host:9092"));
assert!(!p.matches("broker:9093"));

let p = ConnectionRulePattern {
prefix_wildcard: false,
literal_match: "broker:".to_string(),
suffix_wildcard: true,
};
assert!(p.matches("broker:9092"));
assert!(!p.matches("other:9092"));

let p = ConnectionRulePattern {
prefix_wildcard: true,
literal_match: "broker".to_string(),
suffix_wildcard: true,
};
assert!(p.matches("my-broker-host:1234"));
assert!(!p.matches("other:9092"));
}
}
1 change: 1 addition & 0 deletions src/sql-lexer/src/keywords.txt
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ Managed
Manual
Map
Marketing
Matching
Materialize
Materialized
Max
Expand Down
67 changes: 67 additions & 0 deletions src/sql-parser/src/ast/defs/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,68 @@ impl<T: AstInfo> AstDisplay for ConnectionDefaultAwsPrivatelink<T> {
}
impl_display_t!(ConnectionDefaultAwsPrivatelink);

#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
/// A MATCHING rule inside BROKERS (...) that routes brokers matching a pattern
/// through an AWS PrivateLink tunnel.
pub struct KafkaMatchingBrokerRule<T: AstInfo> {
/// Given a broker's host:port, should we use this route?
pub pattern: ConnectionRulePattern,
/// Route to the broker through this PrivateLink connection.
pub tunnel: KafkaBrokerAwsPrivatelink<T>,
}

#[derive(
Debug,
Clone,
PartialEq,
Eq,
Hash,
PartialOrd,
Ord,
Serialize,
Deserialize
)]
/// Parsed from a string, with optional leading and trailing '*' wildcards.
pub struct ConnectionRulePattern {
/// If true, allow any combination of characters before the literal match.
pub prefix_wildcard: bool,
/// We expect the broker's host:port to match these characters in their entirety.
pub literal_match: String,
/// If true, allow any combination of characters after the literal match.
pub suffix_wildcard: bool,
}

impl<T: AstInfo> AstDisplay for KafkaMatchingBrokerRule<T> {
fn fmt<W>(&self, f: &mut AstFormatter<W>)
where
W: fmt::Write,
{
f.write_str("MATCHING ");
f.write_node(&self.pattern);
f.write_str(" ");
f.write_node(&self.tunnel);
}
}
impl_display_t!(KafkaMatchingBrokerRule);

impl AstDisplay for ConnectionRulePattern {
fn fmt<W>(&self, f: &mut AstFormatter<W>)
where
W: fmt::Write,
{
f.write_str("'");
if self.prefix_wildcard {
f.write_str("*");
}
f.write_node(&display::escape_single_quote_string(&self.literal_match));
if self.suffix_wildcard {
f.write_str("*");
}
f.write_str("'");
}
}
impl_display!(ConnectionRulePattern);

#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct KafkaBroker<T: AstInfo> {
pub address: String,
Expand Down Expand Up @@ -4396,6 +4458,7 @@ pub enum WithOptionValue<T: AstInfo> {
ClusterReplicas(Vec<ReplicaDefinition<T>>),
ConnectionKafkaBroker(KafkaBroker<T>),
ConnectionAwsPrivatelink(ConnectionDefaultAwsPrivatelink<T>),
KafkaMatchingBrokerRule(KafkaMatchingBrokerRule<T>),
RetainHistoryFor(Value),
Refresh(RefreshOptionValue<T>),
ClusterScheduleOptionValue(ClusterScheduleOptionValue),
Expand Down Expand Up @@ -4426,6 +4489,7 @@ impl<T: AstInfo> AstDisplay for WithOptionValue<T> {
| WithOptionValue::UnresolvedItemName(_)
| WithOptionValue::Ident(_)
| WithOptionValue::ConnectionAwsPrivatelink(_)
| WithOptionValue::KafkaMatchingBrokerRule(_)
| WithOptionValue::ClusterReplicas(_)
| WithOptionValue::ClusterScheduleOptionValue(_)
| WithOptionValue::ClusterAlterStrategy(_)
Expand Down Expand Up @@ -4477,6 +4541,9 @@ impl<T: AstInfo> AstDisplay for WithOptionValue<T> {
WithOptionValue::ConnectionAwsPrivatelink(aws_privatelink) => {
f.write_node(aws_privatelink);
}
WithOptionValue::KafkaMatchingBrokerRule(rule) => {
f.write_node(rule);
}
WithOptionValue::ConnectionKafkaBroker(broker) => {
f.write_node(broker);
}
Expand Down
Loading
Loading