diff --git a/src/adapter/src/coord/ddl.rs b/src/adapter/src/coord/ddl.rs index c8e5df8ee8b63..31fd5bffd00da 100644 --- a/src/adapter/src/coord/ddl.rs +++ b/src/adapter/src/coord/ddl.rs @@ -822,10 +822,14 @@ impl Coordinator { } pub(crate) fn drop_vpc_endpoints_in_background(&self, vpc_endpoints: Vec) { - let cloud_resource_controller = Arc::clone(self.cloud_resource_controller - .as_ref() - .ok_or(AdapterError::Unsupported("AWS PrivateLink connections")) - .expect("vpc endpoints should only be dropped in CLOUD, where `cloud_resource_controller` is `Some`")); + // Match the create path (catalog_implications.rs) which gracefully + // logs an error when cloud_resource_controller is None, rather than + // panicking. + let Some(cloud_resource_controller) = self.cloud_resource_controller.as_ref() else { + warn!("dropping VPC endpoints without cloud_resource_controller; skipping cleanup"); + return; + }; + let cloud_resource_controller = Arc::clone(cloud_resource_controller); // We don't want to block the coordinator on an external delete api // calls, so move the drop vpc_endpoint to a separate task. This does // mean that a failed drop won't bubble up to the user as an error diff --git a/src/kafka-util/src/client.rs b/src/kafka-util/src/client.rs index 0181af23ae8ef..3eb0f9b262313 100644 --- a/src/kafka-util/src/client.rs +++ b/src/kafka-util/src/client.rs @@ -288,6 +288,13 @@ pub struct BrokerAddr { pub port: u16, } +impl BrokerAddr { + /// Attempt to resolve this broker address into a list of socket addresses. + pub fn to_socket_addrs(&self) -> Result, io::Error> { + Ok((self.host.as_str(), self.port).to_socket_addrs()?.collect()) + } +} + /// Rewrites a broker address. /// /// For use with [`TunnelingClientContext`]. @@ -301,6 +308,16 @@ pub struct BrokerRewrite { pub port: Option, } +impl BrokerRewrite { + /// Apply the rewrite to this broker address. + pub fn rewrite(&self, address: &BrokerAddr) -> BrokerAddr { + BrokerAddr { + host: self.host.clone(), + port: self.port.unwrap_or(address.port), + } + } +} + #[derive(Clone)] enum BrokerRewriteHandle { Simple(BrokerRewrite), @@ -313,6 +330,78 @@ enum BrokerRewriteHandle { FailedDefaultSshTunnel(String), } +#[derive(Clone)] +/// Parsed from a string, with optional leading and trailing '*' wildcards. +pub struct ConnectionRulePattern { + /// If true, allow any combination of characters before the literal match. + pub prefix_wildcard: bool, + /// We expect the broker's host:port to match these characters in their entirety. + pub literal_match: String, + /// If true, allow any combination of characters after the literal match. + pub suffix_wildcard: bool, +} + +impl std::fmt::Display for ConnectionRulePattern { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + if self.prefix_wildcard { + f.write_str("*")?; + } + f.write_str(&self.literal_match)?; + if self.suffix_wildcard { + f.write_str("*")?; + } + Ok(()) + } +} + +impl ConnectionRulePattern { + /// Does this "{host}:{port}" address fit the pattern? + pub fn matches(&self, address: &str) -> bool { + if self.prefix_wildcard { + if self.suffix_wildcard { + address.contains(&self.literal_match) + } else { + address.ends_with(&self.literal_match) + } + } else if self.suffix_wildcard { + address.starts_with(&self.literal_match) + } else { + address == self.literal_match + } + } +} + +#[derive(Clone)] +/// Given a host address, map it to a different host. +pub struct HostMappingRules { + /// Map matching hosts to a different host. First applicable rule wins. + pub rules: Vec<(ConnectionRulePattern, BrokerRewrite)>, +} + +impl HostMappingRules { + /// Rewrite this broker address according to the rules. Returns `None` when + /// no rule matches. + pub fn rewrite(&self, src: &BrokerAddr) -> Option { + let address = format!("{}:{}", src.host, src.port); + for (pattern, dst) in &self.rules { + if pattern.matches(&address) { + let result = dst.rewrite(src); + info!( + "HostMappingRules: broker {}:{} matched pattern '{}' -> rewriting to {}:{}", + src.host, src.port, pattern, result.host, result.port, + ); + return Some(result); + } + } + + warn!( + "HostMappingRules: broker {}:{} matched no rules, using original address", + src.host, src.port, + ); + None + } +} + /// Tunneling clients /// used for re-writing ports / hosts #[derive(Clone)] @@ -321,6 +410,8 @@ pub enum TunnelConfig { Ssh(SshTunnelConfig), /// Re-writes internal hosts using the value, used for privatelink StaticHost(String), + /// Re-writes internal hosts according to an ordered list of rules, also used for privatelink + Rules(HostMappingRules), /// Performs no re-writes None, } @@ -489,7 +580,12 @@ where } } + /// Look up the broker's address in our book of rewrites. + /// If we've already rewritten it before, reuse the existing rewrite. + /// Otherwise, use our "default tunnel" rewriting strategy to attempt to rewrite this broker's address + /// and record it in the book of rewrites. fn resolve_broker_addr(&self, host: &str, port: u16) -> Result, io::Error> { + info!("kafka: resolve_broker_addr called for {}:{}", host, port); let return_rewrite = |rewrite: &BrokerRewriteHandle| -> Result, io::Error> { let rewrite = match rewrite { BrokerRewriteHandle::Simple(rewrite) => rewrite.clone(), @@ -525,8 +621,12 @@ where let rewrite = self.rewrites.lock().expect("poisoned").get(&addr).cloned(); match rewrite { + // No (successful) broker address rewrite exists yet. None | Some(BrokerRewriteHandle::FailedDefaultSshTunnel(_)) => { + // "Default tunnel" is actually the configured rewriting strategy used for brokers we haven't already rewritten. match &self.default_tunnel { + // This "default tunnel" is actually a default tunnel. + // Try connecting so we have a valid rewrite for thsi broker address. TunnelConfig::Ssh(default_tunnel) => { // Multiple users could all run `connect` at the same time; only one ssh // tunnel will ever be connected, and only one will be inserted into the @@ -543,6 +643,7 @@ where .await }); match ssh_tunnel { + // Use the tunnel we just created, but only if nobody beat us in the race. Ok(ssh_tunnel) => { let mut rewrites = self.rewrites.lock().expect("poisoned"); let rewrite = match rewrites.entry(addr.clone()) { @@ -565,6 +666,7 @@ where return_rewrite(rewrite) } + // We couldn't connect. Someone else will have to try again. Err(e) => { warn!( "failed to create ssh tunnel for {:?}: {}", @@ -587,15 +689,45 @@ where } } } + // Our rewrite strategy is to use a specific host, e.g. a PrivateLink endpoint. TunnelConfig::StaticHost(host) => (host.as_str(), port) .to_socket_addrs() .map(|addrs| addrs.collect()), + // Rewrite according to the routing rules. + TunnelConfig::Rules(rules) => { + // If no rules match, just use the address as-is. + let resolved = rules.rewrite(&addr).unwrap_or_else(|| addr.clone()); + match resolved.to_socket_addrs() { + Ok(addrs) => { + info!( + "kafka: resolve_broker_addr {}:{} -> {}:{} resolved to {:?}", + host, port, resolved.host, resolved.port, addrs, + ); + Ok(addrs) + } + Err(e) => { + warn!( + "kafka: resolve_broker_addr {}:{} -> {}:{} DNS resolution FAILED: {e}", + host, port, resolved.host, resolved.port, + ); + Err(e) + } + } + } + // We leave the broker's address as it is. TunnelConfig::None => { (host, port).to_socket_addrs().map(|addrs| addrs.collect()) } } } - Some(rewrite) => return_rewrite(&rewrite), + // This broker's address was already rewritten. Reuse the existing rewrite. + Some(rewrite) => { + info!( + "kafka: resolve_broker_addr {}:{} using cached rewrite", + host, port + ); + return_rewrite(&rewrite) + } } } @@ -958,3 +1090,43 @@ pub fn create_new_client_config( config } + +#[cfg(test)] +mod tests { + use super::*; + + #[mz_ore::test] + fn test_connection_rule_pattern_matches() { + let p = ConnectionRulePattern { + prefix_wildcard: false, + literal_match: "broker:9092".to_string(), + suffix_wildcard: false, + }; + assert!(p.matches("broker:9092")); + assert!(!p.matches("other:9092")); + + let p = ConnectionRulePattern { + prefix_wildcard: true, + literal_match: ":9092".to_string(), + suffix_wildcard: false, + }; + assert!(p.matches("any-host:9092")); + assert!(!p.matches("broker:9093")); + + let p = ConnectionRulePattern { + prefix_wildcard: false, + literal_match: "broker:".to_string(), + suffix_wildcard: true, + }; + assert!(p.matches("broker:9092")); + assert!(!p.matches("other:9092")); + + let p = ConnectionRulePattern { + prefix_wildcard: true, + literal_match: "broker".to_string(), + suffix_wildcard: true, + }; + assert!(p.matches("my-broker-host:1234")); + assert!(!p.matches("other:9092")); + } +} diff --git a/src/sql-lexer/src/keywords.txt b/src/sql-lexer/src/keywords.txt index e1aac2a6eb128..b0188a2cc702d 100644 --- a/src/sql-lexer/src/keywords.txt +++ b/src/sql-lexer/src/keywords.txt @@ -279,6 +279,7 @@ Managed Manual Map Marketing +Matching Materialize Materialized Max diff --git a/src/sql-parser/src/ast/defs/statement.rs b/src/sql-parser/src/ast/defs/statement.rs index 1d40475faac14..06b27a2422a39 100644 --- a/src/sql-parser/src/ast/defs/statement.rs +++ b/src/sql-parser/src/ast/defs/statement.rs @@ -603,6 +603,68 @@ impl AstDisplay for ConnectionDefaultAwsPrivatelink { } impl_display_t!(ConnectionDefaultAwsPrivatelink); +#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] +/// A MATCHING rule inside BROKERS (...) that routes brokers matching a pattern +/// through an AWS PrivateLink tunnel. +pub struct KafkaMatchingBrokerRule { + /// Given a broker's host:port, should we use this route? + pub pattern: ConnectionRulePattern, + /// Route to the broker through this PrivateLink connection. + pub tunnel: KafkaBrokerAwsPrivatelink, +} + +#[derive( + Debug, + Clone, + PartialEq, + Eq, + Hash, + PartialOrd, + Ord, + Serialize, + Deserialize +)] +/// Parsed from a string, with optional leading and trailing '*' wildcards. +pub struct ConnectionRulePattern { + /// If true, allow any combination of characters before the literal match. + pub prefix_wildcard: bool, + /// We expect the broker's host:port to match these characters in their entirety. + pub literal_match: String, + /// If true, allow any combination of characters after the literal match. + pub suffix_wildcard: bool, +} + +impl AstDisplay for KafkaMatchingBrokerRule { + fn fmt(&self, f: &mut AstFormatter) + where + W: fmt::Write, + { + f.write_str("MATCHING "); + f.write_node(&self.pattern); + f.write_str(" "); + f.write_node(&self.tunnel); + } +} +impl_display_t!(KafkaMatchingBrokerRule); + +impl AstDisplay for ConnectionRulePattern { + fn fmt(&self, f: &mut AstFormatter) + where + W: fmt::Write, + { + f.write_str("'"); + if self.prefix_wildcard { + f.write_str("*"); + } + f.write_node(&display::escape_single_quote_string(&self.literal_match)); + if self.suffix_wildcard { + f.write_str("*"); + } + f.write_str("'"); + } +} +impl_display!(ConnectionRulePattern); + #[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] pub struct KafkaBroker { pub address: String, @@ -4396,6 +4458,7 @@ pub enum WithOptionValue { ClusterReplicas(Vec>), ConnectionKafkaBroker(KafkaBroker), ConnectionAwsPrivatelink(ConnectionDefaultAwsPrivatelink), + KafkaMatchingBrokerRule(KafkaMatchingBrokerRule), RetainHistoryFor(Value), Refresh(RefreshOptionValue), ClusterScheduleOptionValue(ClusterScheduleOptionValue), @@ -4426,6 +4489,7 @@ impl AstDisplay for WithOptionValue { | WithOptionValue::UnresolvedItemName(_) | WithOptionValue::Ident(_) | WithOptionValue::ConnectionAwsPrivatelink(_) + | WithOptionValue::KafkaMatchingBrokerRule(_) | WithOptionValue::ClusterReplicas(_) | WithOptionValue::ClusterScheduleOptionValue(_) | WithOptionValue::ClusterAlterStrategy(_) @@ -4477,6 +4541,9 @@ impl AstDisplay for WithOptionValue { WithOptionValue::ConnectionAwsPrivatelink(aws_privatelink) => { f.write_node(aws_privatelink); } + WithOptionValue::KafkaMatchingBrokerRule(rule) => { + f.write_node(rule); + } WithOptionValue::ConnectionKafkaBroker(broker) => { f.write_node(broker); } diff --git a/src/sql-parser/src/parser.rs b/src/sql-parser/src/parser.rs index be21a96e11fda..c612774c670f6 100644 --- a/src/sql-parser/src/parser.rs +++ b/src/sql-parser/src/parser.rs @@ -1895,6 +1895,22 @@ impl<'a> Parser<'a> { } } + /// Optional '=', then comma-separated list in parens/brackets. + fn parse_list_value(&mut self, f: F) -> Result, ParserError> + where + F: FnMut(&mut Self) -> Result, + { + let _ = self.consume_token(&Token::Eq); + let delimiter = self.expect_one_of_tokens(&[Token::LParen, Token::LBracket])?; + let values = self.parse_comma_separated(f)?; + self.expect_token(&match delimiter { + Token::LParen => Token::RParen, + Token::LBracket => Token::RBracket, + _ => unreachable!(), + })?; + Ok(values) + } + /// Parse a comma-separated list of 1+ items accepted by `F` fn parse_comma_separated(&mut self, mut f: F) -> Result, ParserError> where @@ -2531,6 +2547,14 @@ impl<'a> Parser<'a> { fn parse_default_aws_privatelink(&mut self) -> Result, ParserError> { let _ = self.consume_token(&Token::Eq); + Ok(WithOptionValue::ConnectionAwsPrivatelink( + self.parse_default_aws_privatelink_()?, + )) + } + + fn parse_default_aws_privatelink_( + &mut self, + ) -> Result, ParserError> { let connection = self.parse_raw_name()?; let port = if self.consume_token(&Token::LParen) { self.expect_keyword(PORT)?; @@ -2543,9 +2567,69 @@ impl<'a> Parser<'a> { } else { None }; - Ok(WithOptionValue::ConnectionAwsPrivatelink( - ConnectionDefaultAwsPrivatelink { connection, port }, - )) + Ok(ConnectionDefaultAwsPrivatelink { connection, port }) + } + + /// This is just like 'parse_default_aws_privatelink_' except it supports more PrivateLink options. + fn parse_aws_privatelink(&mut self) -> Result, ParserError> { + let connection = self.parse_raw_name()?; + let options = if self.consume_token(&Token::LParen) { + let options = + self.parse_comma_separated(Parser::parse_kafka_broker_aws_privatelink_option)?; + self.expect_token(&Token::RParen)?; + options + } else { + vec![] + }; + Ok(KafkaBrokerAwsPrivatelink { + connection, + options, + }) + } + + fn parse_connection_rule_pattern(&mut self) -> Result { + let s = self.parse_literal_string()?; + let pos = self.peek_prev_pos(); + let mut prefix_wildcard = false; + let mut suffix_wildcard = false; + let mut remainder = &s[..]; + + if let Some(stripped) = remainder.strip_prefix('*') { + prefix_wildcard = true; + remainder = stripped; + } + if let Some(stripped) = remainder.strip_suffix('*') { + suffix_wildcard = true; + remainder = stripped; + } + + if remainder.contains('*') { + return parser_err!( + self, + pos, + "pattern may only contain `*` as a leading and/or trailing wildcard" + ); + } + + Ok(ConnectionRulePattern { + prefix_wildcard, + literal_match: remainder.to_owned(), + suffix_wildcard, + }) + } + + fn parse_kafka_broker_or_matching_rule(&mut self) -> Result, ParserError> { + if self.parse_keyword(MATCHING) { + let pattern = self.parse_connection_rule_pattern()?; + self.expect_keyword(USING)?; + self.expect_keywords(&[AWS, PRIVATELINK])?; + let tunnel = self.parse_aws_privatelink()?; + Ok(WithOptionValue::KafkaMatchingBrokerRule( + KafkaMatchingBrokerRule { pattern, tunnel }, + )) + } else { + self.parse_kafka_broker() + } } fn parse_kafka_broker(&mut self) -> Result, ParserError> { @@ -2555,20 +2639,7 @@ impl<'a> Parser<'a> { match self.expect_one_of_keywords(&[AWS, SSH])? { AWS => { self.expect_keywords(&[PRIVATELINK])?; - let connection = self.parse_raw_name()?; - let options = if self.consume_token(&Token::LParen) { - let options = self.parse_comma_separated( - Parser::parse_kafka_broker_aws_privatelink_option, - )?; - self.expect_token(&Token::RParen)?; - options - } else { - vec![] - }; - KafkaBrokerTunnel::AwsPrivatelink(KafkaBrokerAwsPrivatelink { - connection, - options, - }) + KafkaBrokerTunnel::AwsPrivatelink(self.parse_aws_privatelink()?) } SSH => { self.expect_keywords(&[TUNNEL])?; @@ -2847,51 +2918,18 @@ impl<'a> Parser<'a> { } fn parse_connection_option_unified(&mut self) -> Result, ParserError> { - let name = match self.parse_connection_option_name()? { - ConnectionOptionName::AwsConnection => { - return Ok(ConnectionOption { - name: ConnectionOptionName::AwsConnection, - value: Some(self.parse_object_option_value()?), - }); - } - ConnectionOptionName::AwsPrivatelink => { - return Ok(ConnectionOption { - name: ConnectionOptionName::AwsPrivatelink, - value: Some(self.parse_default_aws_privatelink()?), - }); - } - ConnectionOptionName::Broker => { - return Ok(ConnectionOption { - name: ConnectionOptionName::Broker, - value: Some(self.parse_kafka_broker()?), - }); - } - ConnectionOptionName::Brokers => { - let _ = self.consume_token(&Token::Eq); - let delimiter = self.expect_one_of_tokens(&[Token::LParen, Token::LBracket])?; - let brokers = self.parse_comma_separated(Parser::parse_kafka_broker)?; - self.expect_token(&match delimiter { - Token::LParen => Token::RParen, - Token::LBracket => Token::RBracket, - _ => unreachable!(), - })?; - return Ok(ConnectionOption { - name: ConnectionOptionName::Brokers, - value: Some(WithOptionValue::Sequence(brokers)), - }); - } - ConnectionOptionName::SshTunnel => { - return Ok(ConnectionOption { - name: ConnectionOptionName::SshTunnel, - value: Some(self.parse_object_option_value()?), - }); - } - name => name, + let name = self.parse_connection_option_name()?; + let value = match name { + ConnectionOptionName::AwsConnection => Some(self.parse_object_option_value()?), + ConnectionOptionName::AwsPrivatelink => Some(self.parse_default_aws_privatelink()?), + ConnectionOptionName::Broker => Some(self.parse_kafka_broker()?), + ConnectionOptionName::Brokers => Some(WithOptionValue::Sequence( + self.parse_list_value(Parser::parse_kafka_broker_or_matching_rule)?, + )), + ConnectionOptionName::SshTunnel => Some(self.parse_object_option_value()?), + _ => self.parse_optional_option_value()?, }; - Ok(ConnectionOption { - name, - value: self.parse_optional_option_value()?, - }) + Ok(ConnectionOption { name, value }) } fn parse_create_subsource(&mut self) -> Result, ParserError> { diff --git a/src/sql-parser/tests/testdata/ddl b/src/sql-parser/tests/testdata/ddl index 2f61ef29fd45d..465ca01b1bb2d 100644 --- a/src/sql-parser/tests/testdata/ddl +++ b/src/sql-parser/tests/testdata/ddl @@ -631,6 +631,76 @@ CREATE CONNECTION mysqlconn TO MYSQL (AWS PRIVATELINK = db.schema.item, PORT = 1 => CreateConnection(CreateConnectionStatement { name: UnresolvedItemName([Ident("mysqlconn")]), connection_type: MySql, if_not_exists: false, values: [ConnectionOption { name: AwsPrivatelink, value: Some(ConnectionAwsPrivatelink(ConnectionDefaultAwsPrivatelink { connection: Name(UnresolvedItemName([Ident("db"), Ident("schema"), Ident("item")])), port: None })) }, ConnectionOption { name: Port, value: Some(Value(Number("1234"))) }, ConnectionOption { name: Host, value: Some(UnresolvedItemName(UnresolvedItemName([Ident("foo")]))) }, ConnectionOption { name: SslCertificate, value: Some(Value(String("cert"))) }, ConnectionOption { name: SslCertificateAuthority, value: Some(Value(String("auth"))) }, ConnectionOption { name: SslKey, value: Some(Value(String("key"))) }], with_options: [] }) +# Static broker with PrivateLink + MATCHING rules in BROKERS +parse-statement +CREATE CONNECTION kafka_connection TO KAFKA (BROKERS ('lkc-825730.endpoint.cloud:9092' USING AWS PRIVATELINK privatelink_svc (AVAILABILITY ZONE 'use1-az1'), MATCHING '*.use1-az1.*' USING AWS PRIVATELINK privatelink_svc (AVAILABILITY ZONE 'use1-az1'))) +---- +CREATE CONNECTION kafka_connection TO KAFKA (BROKERS = ('lkc-825730.endpoint.cloud:9092' USING AWS PRIVATELINK privatelink_svc (AVAILABILITY ZONE = 'use1-az1'), MATCHING '*.use1-az1.*' USING AWS PRIVATELINK privatelink_svc (AVAILABILITY ZONE = 'use1-az1'))) +=> +CreateConnection(CreateConnectionStatement { name: UnresolvedItemName([Ident("kafka_connection")]), connection_type: Kafka, if_not_exists: false, values: [ConnectionOption { name: Brokers, value: Some(Sequence([ConnectionKafkaBroker(KafkaBroker { address: "lkc-825730.endpoint.cloud:9092", tunnel: AwsPrivatelink(KafkaBrokerAwsPrivatelink { connection: Name(UnresolvedItemName([Ident("privatelink_svc")])), options: [KafkaBrokerAwsPrivatelinkOption { name: AvailabilityZone, value: Some(Value(String("use1-az1"))) }] }) }), KafkaMatchingBrokerRule(KafkaMatchingBrokerRule { pattern: ConnectionRulePattern { prefix_wildcard: true, literal_match: ".use1-az1.", suffix_wildcard: true }, tunnel: KafkaBrokerAwsPrivatelink { connection: Name(UnresolvedItemName([Ident("privatelink_svc")])), options: [KafkaBrokerAwsPrivatelinkOption { name: AvailabilityZone, value: Some(Value(String("use1-az1"))) }] } })])) }], with_options: [] }) + +# BROKERS with MATCHING entries +parse-statement +CREATE CONNECTION kafka_connection TO KAFKA (BROKERS (MATCHING '*.use1-az1.*' USING AWS PRIVATELINK privatelink_svc (AVAILABILITY ZONE 'use1-az1'), MATCHING '*.use1-az4.*' USING AWS PRIVATELINK privatelink_svc (AVAILABILITY ZONE 'use1-az4'))) +---- +CREATE CONNECTION kafka_connection TO KAFKA (BROKERS = (MATCHING '*.use1-az1.*' USING AWS PRIVATELINK privatelink_svc (AVAILABILITY ZONE = 'use1-az1'), MATCHING '*.use1-az4.*' USING AWS PRIVATELINK privatelink_svc (AVAILABILITY ZONE = 'use1-az4'))) +=> +CreateConnection(CreateConnectionStatement { name: UnresolvedItemName([Ident("kafka_connection")]), connection_type: Kafka, if_not_exists: false, values: [ConnectionOption { name: Brokers, value: Some(Sequence([KafkaMatchingBrokerRule(KafkaMatchingBrokerRule { pattern: ConnectionRulePattern { prefix_wildcard: true, literal_match: ".use1-az1.", suffix_wildcard: true }, tunnel: KafkaBrokerAwsPrivatelink { connection: Name(UnresolvedItemName([Ident("privatelink_svc")])), options: [KafkaBrokerAwsPrivatelinkOption { name: AvailabilityZone, value: Some(Value(String("use1-az1"))) }] } }), KafkaMatchingBrokerRule(KafkaMatchingBrokerRule { pattern: ConnectionRulePattern { prefix_wildcard: true, literal_match: ".use1-az4.", suffix_wildcard: true }, tunnel: KafkaBrokerAwsPrivatelink { connection: Name(UnresolvedItemName([Ident("privatelink_svc")])), options: [KafkaBrokerAwsPrivatelinkOption { name: AvailabilityZone, value: Some(Value(String("use1-az4"))) }] } })])) }], with_options: [] }) + +# MATCHING with availability zone and port +parse-statement +CREATE CONNECTION kafka_connection TO KAFKA (BROKERS (MATCHING '*.use1-az1.*' USING AWS PRIVATELINK privatelink_svc (AVAILABILITY ZONE 'use1-az1', PORT 9092))) +---- +CREATE CONNECTION kafka_connection TO KAFKA (BROKERS = (MATCHING '*.use1-az1.*' USING AWS PRIVATELINK privatelink_svc (AVAILABILITY ZONE = 'use1-az1', PORT = 9092))) +=> +CreateConnection(CreateConnectionStatement { name: UnresolvedItemName([Ident("kafka_connection")]), connection_type: Kafka, if_not_exists: false, values: [ConnectionOption { name: Brokers, value: Some(Sequence([KafkaMatchingBrokerRule(KafkaMatchingBrokerRule { pattern: ConnectionRulePattern { prefix_wildcard: true, literal_match: ".use1-az1.", suffix_wildcard: true }, tunnel: KafkaBrokerAwsPrivatelink { connection: Name(UnresolvedItemName([Ident("privatelink_svc")])), options: [KafkaBrokerAwsPrivatelinkOption { name: AvailabilityZone, value: Some(Value(String("use1-az1"))) }, KafkaBrokerAwsPrivatelinkOption { name: Port, value: Some(Value(Number("9092"))) }] } })])) }], with_options: [] }) + +# Mixed static and MATCHING in BROKERS +parse-statement +CREATE CONNECTION kafka_connection TO KAFKA (BROKERS ('static:9092', MATCHING '*.use1-az1.*' USING AWS PRIVATELINK privatelink_svc (AVAILABILITY ZONE 'use1-az1'))) +---- +CREATE CONNECTION kafka_connection TO KAFKA (BROKERS = ('static:9092', MATCHING '*.use1-az1.*' USING AWS PRIVATELINK privatelink_svc (AVAILABILITY ZONE = 'use1-az1'))) +=> +CreateConnection(CreateConnectionStatement { name: UnresolvedItemName([Ident("kafka_connection")]), connection_type: Kafka, if_not_exists: false, values: [ConnectionOption { name: Brokers, value: Some(Sequence([ConnectionKafkaBroker(KafkaBroker { address: "static:9092", tunnel: Direct }), KafkaMatchingBrokerRule(KafkaMatchingBrokerRule { pattern: ConnectionRulePattern { prefix_wildcard: true, literal_match: ".use1-az1.", suffix_wildcard: true }, tunnel: KafkaBrokerAwsPrivatelink { connection: Name(UnresolvedItemName([Ident("privatelink_svc")])), options: [KafkaBrokerAwsPrivatelinkOption { name: AvailabilityZone, value: Some(Value(String("use1-az1"))) }] } })])) }], with_options: [] }) + +# MATCHING requires USING AWS PRIVATELINK +parse-statement +CREATE CONNECTION kafka_connection TO KAFKA (BROKERS (MATCHING '*.az.*')) +---- +error: Expected USING, found right parenthesis +CREATE CONNECTION kafka_connection TO KAFKA (BROKERS (MATCHING '*.az.*')) + ^ + +# MATCHING patterns must reject internal `*` wildcards. Only a single leading +# and/or trailing `*` is supported. +parse-statement +CREATE CONNECTION kafka_connection TO KAFKA (BROKERS (MATCHING '*foo*bar*' USING AWS PRIVATELINK privatelink_svc (AVAILABILITY ZONE 'use1-az1'))) +---- +error: pattern may only contain `*` as a leading and/or trailing wildcard +CREATE CONNECTION kafka_connection TO KAFKA (BROKERS (MATCHING '*foo*bar*' USING AWS PRIVATELINK privatelink_svc (AVAILABILITY ZONE 'use1-az1'))) + ^ + +parse-statement +CREATE CONNECTION kafka_connection TO KAFKA (BROKERS (MATCHING 'foo*bar' USING AWS PRIVATELINK privatelink_svc (AVAILABILITY ZONE 'use1-az1'))) +---- +error: pattern may only contain `*` as a leading and/or trailing wildcard +CREATE CONNECTION kafka_connection TO KAFKA (BROKERS (MATCHING 'foo*bar' USING AWS PRIVATELINK privatelink_svc (AVAILABILITY ZONE 'use1-az1'))) + ^ + +parse-statement +CREATE CONNECTION kafka_connection TO KAFKA (BROKERS (MATCHING '**az1**' USING AWS PRIVATELINK privatelink_svc (AVAILABILITY ZONE 'use1-az1'))) +---- +error: pattern may only contain `*` as a leading and/or trailing wildcard +CREATE CONNECTION kafka_connection TO KAFKA (BROKERS (MATCHING '**az1**' USING AWS PRIVATELINK privatelink_svc (AVAILABILITY ZONE 'use1-az1'))) + ^ + +parse-statement +CREATE CONNECTION kafka_connection TO KAFKA (BROKERS (MATCHING 'broker-*-az1-*.host.com' USING AWS PRIVATELINK privatelink_svc (AVAILABILITY ZONE 'use1-az1'))) +---- +error: pattern may only contain `*` as a leading and/or trailing wildcard +CREATE CONNECTION kafka_connection TO KAFKA (BROKERS (MATCHING 'broker-*-az1-*.host.com' USING AWS PRIVATELINK privatelink_svc (AVAILABILITY ZONE 'use1-az1'))) + ^ + parse-statement CREATE SOURCE mz_source FROM MYSQL CONNECTION mysqlconn FOR TABLES (foo, bar as qux, baz into zop); ---- diff --git a/src/sql/src/names.rs b/src/sql/src/names.rs index f50451dad26ba..7245952c5f1ae 100644 --- a/src/sql/src/names.rs +++ b/src/sql/src/names.rs @@ -2098,6 +2098,9 @@ impl<'a> Fold for NameResolver<'a> { ConnectionAwsPrivatelink(privatelink) => { ConnectionAwsPrivatelink(self.fold_connection_default_aws_privatelink(privatelink)) } + KafkaMatchingBrokerRule(x) => { + KafkaMatchingBrokerRule(self.fold_kafka_matching_broker_rule(x)) + } RetainHistoryFor(value) => RetainHistoryFor(self.fold_value(value)), Refresh(refresh) => Refresh(self.fold_refresh_option_value(refresh)), ClusterScheduleOptionValue(value) => ClusterScheduleOptionValue(value), diff --git a/src/sql/src/plan/statement.rs b/src/sql/src/plan/statement.rs index 874ffe5cba61d..3be54c1b79278 100644 --- a/src/sql/src/plan/statement.rs +++ b/src/sql/src/plan/statement.rs @@ -19,12 +19,10 @@ use mz_repr::{ CatalogItemId, ColumnIndex, RelationDesc, RelationVersionSelector, SqlColumnType, SqlScalarType, }; use mz_sql_parser::ast::{ - ColumnDef, ColumnName, ConnectionDefaultAwsPrivatelink, CreateMaterializedViewStatement, - RawItemName, ShowStatement, StatementKind, TableConstraint, UnresolvedDatabaseName, - UnresolvedSchemaName, + ColumnDef, ColumnName, CreateMaterializedViewStatement, RawItemName, ShowStatement, + StatementKind, TableConstraint, UnresolvedDatabaseName, UnresolvedSchemaName, }; -use mz_storage_types::connections::inline::ReferencedConnection; -use mz_storage_types::connections::{AwsPrivatelink, Connection, SshTunnel, Tunnel}; +use mz_storage_types::connections::Connection; use crate::ast::{Ident, Statement, UnresolvedItemName}; use crate::catalog::{ @@ -39,7 +37,7 @@ use crate::names::{ }; use crate::normalize; use crate::plan::error::PlanError; -use crate::plan::{Params, Plan, PlanContext, PlanKind, query, with_options}; +use crate::plan::{Params, Plan, PlanContext, PlanKind, query}; use crate::session::vars::FeatureFlag; mod acl; @@ -883,44 +881,6 @@ impl<'a> StatementContext<'a> { self.catalog.humanize_sql_column_type(typ, postgres_compat) } - pub(crate) fn build_tunnel_definition( - &self, - ssh_tunnel: Option, - aws_privatelink: Option>, - ) -> Result, PlanError> { - match (ssh_tunnel, aws_privatelink) { - (None, None) => Ok(Tunnel::Direct), - (Some(ssh_tunnel), None) => { - let id = CatalogItemId::from(ssh_tunnel); - let ssh_tunnel = self.catalog.get_item(&id); - match ssh_tunnel.connection()? { - Connection::Ssh(_connection) => Ok(Tunnel::Ssh(SshTunnel { - connection_id: id, - connection: id, - })), - _ => sql_bail!("{} is not an SSH connection", ssh_tunnel.name().item), - } - } - (None, Some(aws_privatelink)) => { - let id = aws_privatelink.connection.item_id().clone(); - let entry = self.catalog.get_item(&id); - match entry.connection()? { - Connection::AwsPrivatelink(_) => Ok(Tunnel::AwsPrivatelink(AwsPrivatelink { - connection_id: id, - // By default we do not specify an availability zone for the tunnel. - availability_zone: None, - // We always use the port as specified by the top-level connection. - port: aws_privatelink.port, - })), - _ => sql_bail!("{} is not an AWS PRIVATELINK connection", entry.name().item), - } - } - (Some(_), Some(_)) => { - sql_bail!("cannot specify both SSH TUNNEL and AWS PRIVATELINK"); - } - } - } - pub fn relation_desc_into_table_defs( &self, desc: &RelationDesc, diff --git a/src/sql/src/plan/statement/ddl/connection.rs b/src/sql/src/plan/statement/ddl/connection.rs index 5b94d4e9d3553..0d34f343969f0 100644 --- a/src/sql/src/plan/statement/ddl/connection.rs +++ b/src/sql/src/plan/statement/ddl/connection.rs @@ -23,7 +23,7 @@ use mz_sql_parser::ast::display::AstDisplay; use mz_sql_parser::ast::{ ConnectionDefaultAwsPrivatelink, ConnectionOption, ConnectionOptionName, CreateConnectionType, KafkaBroker, KafkaBrokerAwsPrivatelinkOption, KafkaBrokerAwsPrivatelinkOptionName, - KafkaBrokerTunnel, + KafkaBrokerTunnel, KafkaMatchingBrokerRule, }; use mz_ssh_util::keys::SshKeyPair; use mz_storage_types::connections::aws::{ @@ -32,11 +32,11 @@ use mz_storage_types::connections::aws::{ use mz_storage_types::connections::inline::ReferencedConnection; use mz_storage_types::connections::string_or_secret::StringOrSecret; use mz_storage_types::connections::{ - AwsPrivatelink, AwsPrivatelinkConnection, CsrConnection, CsrConnectionHttpAuth, - IcebergCatalogConnection, IcebergCatalogImpl, IcebergCatalogType, KafkaConnection, - KafkaSaslConfig, KafkaTlsConfig, KafkaTopicOptions, MySqlConnection, MySqlSslMode, - PostgresConnection, RestIcebergCatalog, S3TablesRestIcebergCatalog, SqlServerConnectionDetails, - SshConnection, SshTunnel, TlsIdentity, Tunnel, + AwsPrivatelink, AwsPrivatelinkConnection, AwsPrivatelinkRule, CsrConnection, + CsrConnectionHttpAuth, IcebergCatalogConnection, IcebergCatalogImpl, IcebergCatalogType, + KafkaConnection, KafkaSaslConfig, KafkaTlsConfig, KafkaTopicOptions, MySqlConnection, + MySqlSslMode, PostgresConnection, RestIcebergCatalog, S3TablesRestIcebergCatalog, + SqlServerConnectionDetails, SshConnection, SshTunnel, TlsIdentity, Tunnel, }; use crate::names::Aug; @@ -53,9 +53,8 @@ generate_extracted_config!( (AvailabilityZones, Vec), (AwsConnection, with_options::Object), (AwsPrivatelink, ConnectionDefaultAwsPrivatelink), - // (AwsPrivatelink, with_options::Object), (Broker, Vec>), - (Brokers, Vec>), + (Brokers, with_options::BrokersList), (Credential, StringOrSecret), (Database, String), (Endpoint, String), @@ -315,11 +314,20 @@ impl ConnectionOptionExtracted { } CreateConnectionType::Kafka => { let (tls, sasl) = plan_kafka_security(scx, &self)?; + let (static_brokers, matching_rules) = self.get_brokers_and_rules(scx)?; + + if !matching_rules.is_empty() { + scx.require_feature_flag(&vars::ENABLE_KAFKA_BROKER_MATCHING_RULES)?; + } ConnectionDetails::Kafka(KafkaConnection { - brokers: self.get_brokers(scx)?, - default_tunnel: scx - .build_tunnel_definition(self.ssh_tunnel, self.aws_privatelink)?, + brokers: static_brokers, + default_tunnel: build_tunnel_definition( + scx, + self.ssh_tunnel, + self.aws_privatelink, + if matching_rules.is_empty() { None } else { Some(matching_rules) }, + )?, progress_topic: self.progress_topic, progress_topic_options: KafkaTopicOptions { // We only allow configuring the progress topic replication factor for now. @@ -377,7 +385,12 @@ impl ConnectionOptionExtracted { ) } } - let tunnel = scx.build_tunnel_definition(self.ssh_tunnel, self.aws_privatelink)?; + let tunnel = build_tunnel_definition( + scx, + self.ssh_tunnel, + self.aws_privatelink, + None, /* Rule-based PrivateLink is not supported for CSR. */ + )?; ConnectionDetails::Csr(CsrConnection { url, @@ -419,7 +432,12 @@ impl ConnectionOptionExtracted { ) } } - let tunnel = scx.build_tunnel_definition(self.ssh_tunnel, self.aws_privatelink)?; + let tunnel = build_tunnel_definition( + scx, + self.ssh_tunnel, + self.aws_privatelink, + None, /* Rule-based PrivateLink is not supported for Postgres. */ + )?; ConnectionDetails::Postgres(PostgresConnection { database: self @@ -520,7 +538,12 @@ impl ConnectionOptionExtracted { ) } } - let tunnel = scx.build_tunnel_definition(self.ssh_tunnel, self.aws_privatelink)?; + let tunnel = build_tunnel_definition( + scx, + self.ssh_tunnel, + self.aws_privatelink, + None, /* Rule-based PrivateLink is not supported for MySQL. */ + )?; ConnectionDetails::MySql(MySqlConnection { password: self.password.map(|password| password.into()), @@ -594,7 +617,12 @@ impl ConnectionOptionExtracted { // // See: let port = self.port.unwrap_or(1433_u16); - let tunnel = scx.build_tunnel_definition(self.ssh_tunnel, self.aws_privatelink)?; + let tunnel = build_tunnel_definition( + scx, + self.ssh_tunnel, + self.aws_privatelink, + None, /* Rule-based PrivateLink is not supported for SQL Server. */ + )?; ConnectionDetails::SqlServer(SqlServerConnectionDetails { host: self @@ -673,76 +701,64 @@ impl ConnectionOptionExtracted { Ok(connection) } - pub fn get_brokers( + pub fn get_brokers_and_rules( &self, scx: &StatementContext, - ) -> Result>, PlanError> - { - let mut brokers = match (&self.broker, &self.brokers, &self.aws_privatelink) { - (Some(v), None, None) => v.to_vec(), - (None, Some(v), None) => v.to_vec(), - (None, None, Some(_)) => vec![], + ) -> Result< + ( + Vec>, + Vec>, + ), + PlanError, + > { + // Collect all static broker sources and matching rules. + let mut all_brokers: Vec> = vec![]; + let mut matching_rules: Vec> = vec![]; + + // Check exclusions and extract brokers + match (&self.broker, &self.brokers, &self.aws_privatelink) { + // BROKER + (Some(broker), None, None) => all_brokers.extend(broker.iter().cloned()), + // BROKERS + (None, Some(broker_list), None) => { + all_brokers.extend(broker_list.static_entries.iter().cloned()); + matching_rules.extend(broker_list.matching_rules.iter().cloned()); + } + // AWS PRIVATELINK + (None, None, Some(_privatelink)) => { + // Noting to do here + } + // noting - invalid (None, None, None) => { sql_bail!("invalid CONNECTION: must set one of BROKER, BROKERS, or AWS PRIVATELINK") } + // exclusive keywords provided _ => sql_bail!( "invalid CONNECTION: can only set one of BROKER, BROKERS, or AWS PRIVATELINK" ), }; + // MATCHING rules require at least one static broker for bootstrapping. + if !matching_rules.is_empty() && all_brokers.is_empty() { + sql_bail!( + "invalid CONNECTION: BROKERS must contain at least one static broker address" + ); + } + // NOTE: we allow broker configurations to be mixed and matched. If/when we support // a top-level `SSH TUNNEL` configuration, we will need additional assertions. - let mut out = vec![]; - for broker in &mut brokers { + for broker in &all_brokers { if broker.address.contains(',') { - sql_bail!("invalid CONNECTION: cannot specify multiple Kafka broker addresses in one string.\n\n -Instead, specify BROKERS using multiple strings, e.g. BROKERS ('kafka:9092', 'kafka:9093')"); + sql_bail!( + "invalid CONNECTION: cannot specify multiple Kafka broker addresses in one string.\n\nInstead, specify BROKERS using multiple strings, e.g. BROKERS ('kafka:9092', 'kafka:9093')" + ); } let tunnel = match &broker.tunnel { KafkaBrokerTunnel::Direct => Tunnel::Direct, KafkaBrokerTunnel::AwsPrivatelink(aws_privatelink) => { - let KafkaBrokerAwsPrivatelinkOptionExtracted { - availability_zone, - port, - seen: _, - } = KafkaBrokerAwsPrivatelinkOptionExtracted::try_from( - aws_privatelink.options.clone(), - )?; - - let id = match &aws_privatelink.connection { - ResolvedItemName::Item { id, .. } => id, - _ => sql_bail!( - "internal error: Kafka PrivateLink connection was not resolved" - ), - }; - let entry = scx.catalog.get_item(id); - match entry.connection()? { - Connection::AwsPrivatelink(connection) => { - if let Some(az) = &availability_zone { - if !connection.availability_zones.contains(az) { - sql_bail!( - "AWS PrivateLink availability zone {} does not match any of the \ - availability zones on the AWS PrivateLink connection {}", - az.quoted(), - scx.catalog - .resolve_full_name(entry.name()) - .to_string() - .quoted() - ) - } - } - Tunnel::AwsPrivatelink(AwsPrivatelink { - connection_id: *id, - availability_zone, - port, - }) - } - _ => { - sql_bail!("{} is not an AWS PRIVATELINK connection", entry.name().item) - } - } + Tunnel::AwsPrivatelink(plan_privatelink(scx, aws_privatelink)?) } KafkaBrokerTunnel::SshTunnel(ssh) => { let id = match &ssh { @@ -770,7 +786,7 @@ Instead, specify BROKERS using multiple strings, e.g. BROKERS ('kafka:9092', 'ka }); } - Ok(out) + Ok((out, matching_rules)) } } @@ -927,3 +943,106 @@ fn plan_kafka_security( Ok((tls, sasl)) } +pub fn plan_default_privatelink( + scx: &StatementContext, + pl: &mz_sql_parser::ast::ConnectionDefaultAwsPrivatelink, +) -> Result { + let id = pl.connection.item_id().clone(); + let entry = scx.catalog.get_item(&id); + match entry.connection()? { + Connection::AwsPrivatelink(_) => Ok(AwsPrivatelink { + connection_id: id, + // By default we do not specify an availability zone for the tunnel. + availability_zone: None, + // We always use the port as specified by the top-level connection. + port: pl.port, + }), + _ => sql_bail!("{} is not an AWS PRIVATELINK connection", entry.name().item), + } +} + +pub fn plan_privatelink( + scx: &StatementContext, + pl: &mz_sql_parser::ast::KafkaBrokerAwsPrivatelink, +) -> Result { + let KafkaBrokerAwsPrivatelinkOptionExtracted { + availability_zone, + port, + seen: _, + } = KafkaBrokerAwsPrivatelinkOptionExtracted::try_from(pl.options.clone())?; + + let id = match &pl.connection { + ResolvedItemName::Item { id, .. } => id, + _ => sql_bail!("internal error: Kafka PrivateLink connection was not resolved"), + }; + let entry = scx.catalog.get_item(id); + match entry.connection()? { + Connection::AwsPrivatelink(connection) => { + if let Some(az) = &availability_zone { + if !connection.availability_zones.contains(az) { + sql_bail!( + "AWS PrivateLink availability zone {} does not match any of the \ + availability zones on the AWS PrivateLink connection {}", + az.quoted(), + scx.catalog + .resolve_full_name(entry.name()) + .to_string() + .quoted() + ) + } + } + Ok(AwsPrivatelink { + connection_id: *id, + availability_zone, + port, + }) + } + _ => { + sql_bail!("{} is not an AWS PRIVATELINK connection", entry.name().item) + } + } +} + +pub(crate) fn build_tunnel_definition( + scx: &StatementContext, + ssh_tunnel: Option, + aws_privatelink: Option>, + matching_rules: Option>>, +) -> Result, PlanError> { + Ok(match (ssh_tunnel, aws_privatelink, matching_rules) { + (None, None, None) => Tunnel::Direct, + (Some(ssh_tunnel), None, None) => { + let id = CatalogItemId::from(ssh_tunnel); + let ssh_tunnel = scx.catalog.get_item(&id); + match ssh_tunnel.connection()? { + Connection::Ssh(_connection) => Tunnel::Ssh(SshTunnel { + connection_id: id, + connection: id, + }), + _ => sql_bail!("{} is not an SSH connection", ssh_tunnel.name().item), + } + } + (None, Some(aws_privatelink), None) => { + Tunnel::AwsPrivatelink(plan_default_privatelink(scx, &aws_privatelink)?) + } + (None, None, Some(rules)) => { + if rules.is_empty() { + sql_bail!("BROKERS MATCHING rules list cannot be empty"); + } + + let rules = rules + .iter() + .map(|rule| { + Ok(AwsPrivatelinkRule { + pattern: rule.pattern.clone(), + to: plan_privatelink(scx, &rule.tunnel)?, + }) + }) + .collect::, PlanError>>()?; + Tunnel::AwsPrivatelinks(mz_storage_types::connections::AwsPrivatelinks { rules }) + } + _ => { + sql_bail!("cannot specify both SSH TUNNEL and AWS PRIVATELINK"); + } + }) +} diff --git a/src/sql/src/plan/with_options.rs b/src/sql/src/plan/with_options.rs index aa9f6840d3e39..0bbfdacef33fb 100644 --- a/src/sql/src/plan/with_options.rs +++ b/src/sql/src/plan/with_options.rs @@ -17,7 +17,8 @@ use mz_repr::bytes::ByteSize; use mz_repr::{CatalogItemId, RelationVersionSelector, strconv}; use mz_sql_parser::ast::{ ClusterAlterOptionValue, ClusterScheduleOptionValue, ConnectionDefaultAwsPrivatelink, Expr, - Ident, KafkaBroker, NetworkPolicyRuleDefinition, RefreshOptionValue, ReplicaDefinition, + Ident, KafkaBroker, KafkaMatchingBrokerRule, NetworkPolicyRuleDefinition, RefreshOptionValue, + ReplicaDefinition, }; use mz_storage_types::connections::IcebergCatalogType; use mz_storage_types::connections::string_or_secret::StringOrSecret; @@ -700,6 +701,7 @@ impl, T: AstInfo + std::fmt::Debug> TryFromValue, T: AstInfo + std::fmt::Debug> TryFromValue "exprs", WithOptionValue::ClusterReplicas(_) => "cluster replicas", WithOptionValue::ConnectionKafkaBroker(_) => "connection kafka brokers", - WithOptionValue::ConnectionAwsPrivatelink(_) => "connection kafka brokers", + WithOptionValue::ConnectionAwsPrivatelink(_) => "connection privatelink", + WithOptionValue::KafkaMatchingBrokerRule(_) => "matching broker rule", WithOptionValue::Refresh(_) => "refresh option values", WithOptionValue::ClusterScheduleOptionValue(_) => "cluster schedule", WithOptionValue::NetworkPolicyRules(_) => "network policy rules", @@ -857,12 +860,96 @@ impl TryFromValue> for ConnectionDefaultAwsPrivatelink } } +impl TryFromValue> for KafkaMatchingBrokerRule { + fn try_from_value(v: WithOptionValue) -> Result { + if let WithOptionValue::KafkaMatchingBrokerRule(r) = v { + Ok(r) + } else { + sql_bail!("cannot use value `{}` for a matching broker rule", v) + } + } + + fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option> { + Some(WithOptionValue::KafkaMatchingBrokerRule(self)) + } + + fn name() -> String { + "matching broker rule".to_string() + } +} + impl ImpliedValue for ConnectionDefaultAwsPrivatelink { fn implied_value() -> Result { sql_bail!("must provide a value") } } +impl ImpliedValue for KafkaMatchingBrokerRule { + fn implied_value() -> Result { + sql_bail!("must provide a value") + } +} + +/// A list of broker entries that can contain both static `KafkaBroker` entries +/// and `KafkaMatchingBrokerRule` entries (from `MATCHING` clauses in `BROKERS`). +#[derive(Debug)] +pub struct BrokersList { + pub static_entries: Vec>, + pub matching_rules: Vec>, +} + +impl TryFromValue> for BrokersList { + fn try_from_value(v: WithOptionValue) -> Result { + match v { + WithOptionValue::Sequence(entries) => { + let mut static_entries = vec![]; + let mut matching_rules = vec![]; + for entry in entries { + match entry { + WithOptionValue::ConnectionKafkaBroker(b) => static_entries.push(b), + WithOptionValue::KafkaMatchingBrokerRule(m) => matching_rules.push(m), + other => sql_bail!("unexpected value in BROKERS: {}", other), + } + } + Ok(BrokersList { + static_entries, + matching_rules, + }) + } + WithOptionValue::ConnectionKafkaBroker(b) => Ok(BrokersList { + static_entries: vec![b], + matching_rules: vec![], + }), + WithOptionValue::KafkaMatchingBrokerRule(m) => Ok(BrokersList { + static_entries: vec![], + matching_rules: vec![m], + }), + other => sql_bail!("cannot use {} as brokers list", other), + } + } + + fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option> { + let mut entries: Vec> = vec![]; + for b in self.static_entries { + entries.push(WithOptionValue::ConnectionKafkaBroker(b)); + } + for m in self.matching_rules { + entries.push(WithOptionValue::KafkaMatchingBrokerRule(m)); + } + Some(WithOptionValue::Sequence(entries)) + } + + fn name() -> String { + "brokers list".to_string() + } +} + +impl ImpliedValue for BrokersList { + fn implied_value() -> Result { + sql_bail!("must provide a value for BROKERS") + } +} + impl ImpliedValue for ClusterScheduleOptionValue { fn implied_value() -> Result { sql_bail!("must provide a cluster schedule option value") diff --git a/src/sql/src/session/vars/definitions.rs b/src/sql/src/session/vars/definitions.rs index 99a4148059c79..5a9f82a55477e 100644 --- a/src/sql/src/session/vars/definitions.rs +++ b/src/sql/src/session/vars/definitions.rs @@ -1934,6 +1934,12 @@ feature_flags!( default: true, enable_for_item_parsing: true, }, + { + name: enable_kafka_broker_matching_rules, + desc: "MATCHING broker rules in BROKERS for Kafka PrivateLink connections", + default: false, + enable_for_item_parsing: true, + }, { name: enable_alter_set_cluster, desc: "ALTER ... SET CLUSTER syntax", diff --git a/src/storage-types/src/connections.rs b/src/storage-types/src/connections.rs index 0ae008b7209ba..cd92e29065c82 100644 --- a/src/storage-types/src/connections.rs +++ b/src/storage-types/src/connections.rs @@ -31,7 +31,8 @@ use mz_ccsr::tls::{Certificate, Identity}; use mz_cloud_resources::{AwsExternalIdPrefix, CloudResourceReader, vpc_endpoint_host}; use mz_dyncfg::ConfigSet; use mz_kafka_util::client::{ - BrokerAddr, BrokerRewrite, MzClientContext, MzKafkaError, TunnelConfig, TunnelingClientContext, + BrokerAddr, BrokerRewrite, HostMappingRules, MzClientContext, MzKafkaError, TunnelConfig, + TunnelingClientContext, }; use mz_mysql_util::{MySqlConn, MySqlError}; use mz_ore::assert_none; @@ -41,6 +42,7 @@ use mz_ore::netio::resolve_address; use mz_ore::num::NonNeg; use mz_repr::{CatalogItemId, GlobalId}; use mz_secrets::SecretsReader; +use mz_sql_parser::ast::ConnectionRulePattern; use mz_ssh_util::keys::SshKeyPair; use mz_ssh_util::tunnel::SshTunnelConfig; use mz_ssh_util::tunnel_manager::{ManagedSshTunnelHandle, SshTunnelManager}; @@ -53,7 +55,7 @@ use serde::{Deserialize, Deserializer, Serialize}; use tokio::net; use tokio::runtime::Handle; use tokio_postgres::config::SslMode; -use tracing::{debug, warn}; +use tracing::{debug, info, warn}; use url::Url; use crate::AlterCompatible; @@ -957,15 +959,30 @@ impl KafkaConnection { t.port.unwrap_or(9092) ) } + Tunnel::AwsPrivatelinks(_pl) => { + let algo = KAFKA_DEFAULT_AWS_PRIVATELINK_ENDPOINT_IDENTIFICATION_ALGORITHM + .get(storage_configuration.config_set()); + options.insert("ssl.endpoint.identification.algorithm".into(), algo.into()); + + if self.brokers.is_empty() { + return Err(ContextCreationError::Other(anyhow::anyhow!( + "at least one static broker is required when using BROKER or BROKERS" + ))); + } + self.brokers.iter().map(|b| &b.address).join(",") + } _ => self.brokers.iter().map(|b| &b.address).join(","), }; - options.insert("bootstrap.servers".into(), brokers.into()); + options.insert("bootstrap.servers".into(), brokers.clone().into()); let security_protocol = match (self.tls.is_some(), self.sasl.is_some()) { (false, false) => "PLAINTEXT", (true, false) => "SSL", (false, true) => "SASL_PLAINTEXT", (true, true) => "SASL_SSL", }; + info!( + "kafka: create_with_context bootstrap.servers={brokers}, security_protocol={security_protocol}" + ); options.insert("security.protocol".into(), security_protocol.into()); if let Some(tls) = &self.tls { if let Some(root_cert) = &tls.root_cert { @@ -1067,10 +1084,15 @@ impl KafkaConnection { // By default, don't offer a default override for broker address lookup. } Tunnel::AwsPrivatelink(pl) => { - context.set_default_tunnel(TunnelConfig::StaticHost(vpc_endpoint_host( - pl.connection_id, - None, // Default tunnel does not support availability zones. - ))); + context.set_default_tunnel(TunnelConfig::StaticHost( + // Possible bug: We have been ignoring the configured port. + KafkaConnection::from_default_aws_privatelink(pl).host, + )); + } + Tunnel::AwsPrivatelinks(pl) => { + context.set_default_tunnel(TunnelConfig::Rules( + KafkaConnection::from_aws_privatelinks(pl), + )); } Tunnel::Ssh(ssh_tunnel) => { let secret = storage_configuration @@ -1097,7 +1119,19 @@ impl KafkaConnection { })); } } + info!( + "kafka: tunnel config set to {}", + match &self.default_tunnel { + Tunnel::Direct => "Direct".to_string(), + Tunnel::AwsPrivatelink(_) => "AwsPrivatelink (static host)".to_string(), + Tunnel::AwsPrivatelinks(pl) => + format!("AwsPrivatelinks ({} rules)", pl.rules.len()), + Tunnel::Ssh(_) => "Ssh".to_string(), + } + ); + // Here, we preemptively rewrite broker addresses. + // In concept, this overlaps with 'TunnelingClientContext::resolve_broker_addr'. for broker in &self.brokers { let mut addr_parts = broker.address.splitn(2, ':'); let addr = BrokerAddr { @@ -1124,19 +1158,14 @@ impl KafkaConnection { // in the `TunnelingClientContext`. } Tunnel::AwsPrivatelink(aws_privatelink) => { - let host = mz_cloud_resources::vpc_endpoint_host( - aws_privatelink.connection_id, - aws_privatelink.availability_zone.as_deref(), - ); - let port = aws_privatelink.port; context.add_broker_rewrite( addr, - BrokerRewrite { - host: host.clone(), - port, - }, + KafkaConnection::from_aws_privatelink(aws_privatelink), ); } + Tunnel::AwsPrivatelinks(_) => unreachable!( + "Individually predefined brokers do not use rule-based PrivateLinks routing." + ), Tunnel::Ssh(ssh_tunnel) => { // Ensure any SSH bastion address we connect to is resolved to an external address. let ssh_host_resolved = resolve_address( @@ -1204,11 +1233,16 @@ impl KafkaConnection { // The downside of this approach is it produces a generic error message like // "metadata fetch error" with no additional details. The real networking // error is buried in the librdkafka logs, which are not visible to users. + info!("kafka: starting connection validation via fetch_metadata (timeout={timeout:?})"); let result = mz_ore::task::spawn_blocking(|| "kafka_get_metadata", { let consumer = Arc::clone(&consumer); move || consumer.fetch_metadata(None, timeout) }) .await; + info!( + "kafka: connection validation result: {}", + if result.is_ok() { "success" } else { "failed" }, + ); match result { Ok(_) => Ok(()), // The error returned by `fetch_metadata` does not provide any details which makes for @@ -1236,6 +1270,48 @@ impl KafkaConnection { } } } + + /// The "default" PrivateLink connection is used for bootstrapping Kafka. + fn from_default_aws_privatelink(pl: &AwsPrivatelink) -> BrokerRewrite { + BrokerRewrite { + host: vpc_endpoint_host( + pl.connection_id, + None, // Default tunnel does not support availability zones. + ), + port: pl.port, + } + } + + /// The "not default" PrivateLink connections are used for routing to specific Kafka brokers. + fn from_aws_privatelink(pl: &AwsPrivatelink) -> BrokerRewrite { + BrokerRewrite { + host: vpc_endpoint_host(pl.connection_id, pl.availability_zone.as_deref()), + port: pl.port, + } + } + + fn from_aws_privatelink_rule( + AwsPrivatelinkRule { pattern, to }: &AwsPrivatelinkRule, + ) -> (mz_kafka_util::client::ConnectionRulePattern, BrokerRewrite) { + ( + mz_kafka_util::client::ConnectionRulePattern { + prefix_wildcard: pattern.prefix_wildcard, + literal_match: pattern.literal_match.clone(), + suffix_wildcard: pattern.suffix_wildcard, + }, + KafkaConnection::from_aws_privatelink(to), + ) + } + + fn from_aws_privatelinks(pl: &AwsPrivatelinks) -> HostMappingRules { + HostMappingRules { + rules: pl + .rules + .iter() + .map(KafkaConnection::from_aws_privatelink_rule) + .collect_vec(), + } + } } impl AlterCompatible for KafkaConnection { @@ -1454,6 +1530,9 @@ impl CsrConnection { .collect(); client_config = client_config.resolve_to_addrs(host, &addrs) } + Tunnel::AwsPrivatelinks(_) => { + unreachable!("MATCHING broker rules are only available for Kafka connections."); + } } Ok(client_config.build()?) @@ -1712,6 +1791,9 @@ impl PostgresConnection { connection_id: connection.connection_id, } } + Tunnel::AwsPrivatelinks(_) => { + unreachable!("MATCHING broker rules are only available for Kafka connections."); + } }; Ok(mz_postgres_util::Config::new( @@ -1847,6 +1929,7 @@ pub enum Tunnel { Ssh(SshTunnel), /// Via the specified AWS PrivateLink connection. AwsPrivatelink(AwsPrivatelink), + AwsPrivatelinks(AwsPrivatelinks), } impl IntoInlineConnection for Tunnel { @@ -1855,6 +1938,7 @@ impl IntoInlineConnection for Tunnel Tunnel::Direct, Tunnel::Ssh(ssh) => Tunnel::Ssh(ssh.into_inline_connection(r)), Tunnel::AwsPrivatelink(awspl) => Tunnel::AwsPrivatelink(awspl), + Tunnel::AwsPrivatelinks(x) => Tunnel::AwsPrivatelinks(x), } } } @@ -2064,6 +2148,9 @@ impl MySqlConnection { connection_id: connection.connection_id, } } + Tunnel::AwsPrivatelinks(_) => { + unreachable!("MATCHING broker rules are only available for Kafka connections."); + } }; let aws_config = match self.aws_connection.as_ref() { @@ -2390,6 +2477,9 @@ impl SqlServerConnectionDetails { port: self.port, } } + Tunnel::AwsPrivatelinks(_) => { + unreachable!("MATCHING broker rules are only available for Kafka connections."); + } }; Ok(mz_sql_server_util::Config::new( @@ -2553,6 +2643,22 @@ impl AlterCompatible for AwsPrivatelink { } } +#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)] +pub struct AwsPrivatelinks { + /// Route to brokers through PrivateLink connections according to these rules. + /// Exact-match rules (no wildcards) are used as bootstrap brokers. + /// Wildcard rules are applied dynamically to discovered brokers. + pub rules: Vec, +} + +#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)] +pub struct AwsPrivatelinkRule { + /// Given a broker's host:port, should we use this route? + pub pattern: ConnectionRulePattern, + /// Route to the broker through this PrivateLink connection. + pub to: AwsPrivatelink, +} + /// Specifies an SSH tunnel connection. #[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)] pub struct SshTunnel { diff --git a/test/testdrive/connection-alter.td b/test/testdrive/connection-alter.td index 90f9899a5e6c7..7e7d7cb50da6c 100644 --- a/test/testdrive/connection-alter.td +++ b/test/testdrive/connection-alter.td @@ -50,7 +50,7 @@ first second 1 2 ! ALTER CONNECTION conn RESET (broker); -contains:invalid ALTER CONNECTION: invalid CONNECTION: must set one of BROKER, BROKERS, or AWS PRIVATELINK +contains:set one of BROKER, BROKERS, or AWS PRIVATELINK ! ALTER CONNECTION conn SET (broker = 'abcd') WITH (validate = true); contains:Failed to resolve hostname @@ -111,7 +111,7 @@ contains:BROKER specified more than once contains:BROKER specified more than once ! ALTER CONNECTION conn SET (BROKER '${testdrive.kafka-addr}'), SET (BROKERS ['${testdrive.kafka-addr}']) -contains:invalid ALTER CONNECTION: invalid CONNECTION: can only set one of BROKER, BROKERS, or AWS PRIVATELINK +contains:can only set one of BROKER, BROKERS, or AWS PRIVATELINK ! ALTER CONNECTION conn SET (BROKER '${testdrive.kafka-addr}'), DROP (BROKERS) contains:cannot both SET and DROP/RESET mutually exclusive KAFKA options BROKER, BROKERS @@ -121,7 +121,7 @@ contains:cannot both SET and DROP/RESET mutually exclusive KAFKA options BROKER, # We permit resetting both of these options, and the error occurs later in planning ! ALTER CONNECTION conn RESET (BROKER), RESET (BROKERS); -contains:invalid ALTER CONNECTION: invalid CONNECTION: must set one of BROKER, BROKERS, or AWS PRIVATELINK +contains:set one of BROKER, BROKERS, or AWS PRIVATELINK > ALTER CONNECTION conn SET (BROKER = '${testdrive.kafka-addr}'); diff --git a/test/testdrive/connection-create-drop.td b/test/testdrive/connection-create-drop.td index 059895f18b8ca..408a586f1a43f 100644 --- a/test/testdrive/connection-create-drop.td +++ b/test/testdrive/connection-create-drop.td @@ -479,6 +479,63 @@ contains: HOST option is required ! CREATE CONNECTION conn1 TO KAFKA (BROKER '${testdrive.kafka-addr}' USING AWS PRIVATELINK foo (PORT 9093), SECURITY PROTOCOL PLAINTEXT); contains: unknown catalog item 'foo' +! CREATE CONNECTION conn1 TO KAFKA ( + BROKERS ( + MATCHING '*.use1-az1.*' USING AWS PRIVATELINK foo (AVAILABILITY ZONE = 'use1-az1'), + MATCHING '*.use1-az4.*' USING AWS PRIVATELINK foo (PORT 9093, AVAILABILITY ZONE = 'use1-az4') + ), + SECURITY PROTOCOL PLAINTEXT + ); +contains: unknown catalog item 'foo' + +## MATCHING broker rules for PrivateLink + +$ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr} +ALTER SYSTEM SET max_aws_privatelink_connections = 5; + +> CREATE CONNECTION pl_for_matching TO AWS PRIVATELINK ( + SERVICE NAME 'com.amazonaws.vpce.us-east-1.vpce-svc-0e123abc123198abc', + AVAILABILITY ZONES ('use1-az1', 'use1-az4') + ) + +# MATCHING requires the feature flag +! CREATE CONNECTION kafka_matching_no_flag TO KAFKA ( + BROKERS ( + 'broker:9092', + MATCHING '*' USING AWS PRIVATELINK pl_for_matching + ), + SECURITY PROTOCOL PLAINTEXT + ) WITH (VALIDATE = false); +contains: MATCHING broker rules in BROKERS for Kafka PrivateLink connections is not available + +$ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr} +ALTER SYSTEM SET enable_kafka_broker_matching_rules = true; + +# MATCHING-only BROKERS without a static broker is rejected +! CREATE CONNECTION kafka_matching_no_static TO KAFKA ( + BROKERS ( + MATCHING '*.use1-az1.*' USING AWS PRIVATELINK pl_for_matching (AVAILABILITY ZONE = 'use1-az1') + ), + SECURITY PROTOCOL PLAINTEXT + ) WITH (VALIDATE = false); +contains: BROKERS must contain at least one static broker address + +# Static broker + MATCHING works (validate = false, we just test planning) +> CREATE CONNECTION kafka_matching_ok TO KAFKA ( + BROKERS ( + 'broker:9092' USING AWS PRIVATELINK pl_for_matching, + MATCHING '*.use1-az1.*' USING AWS PRIVATELINK pl_for_matching (AVAILABILITY ZONE = 'use1-az1'), + MATCHING '*.use1-az4.*' USING AWS PRIVATELINK pl_for_matching (AVAILABILITY ZONE = 'use1-az4') + ), + SECURITY PROTOCOL PLAINTEXT + ) WITH (VALIDATE = false) + +> DROP CONNECTION kafka_matching_ok + +$ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr} +ALTER SYSTEM RESET enable_kafka_broker_matching_rules; +ALTER SYSTEM RESET max_aws_privatelink_connections; + ! CREATE CONNECTION conn1 TO CONFLUENT SCHEMA REGISTRY (AWS PRIVATELINK foo, PORT 8080) contains: unknown catalog item 'foo'