Skip to content

kafka: add MATCHING broker rules for PrivateLink AZ mapping#36161

Open
jubrad wants to merge 1 commit intoMaterializeInc:mainfrom
jubrad:jubrad/kafka-privatelink-bootstrap-broker
Open

kafka: add MATCHING broker rules for PrivateLink AZ mapping#36161
jubrad wants to merge 1 commit intoMaterializeInc:mainfrom
jubrad:jubrad/kafka-privatelink-bootstrap-broker

Conversation

@jubrad
Copy link
Copy Markdown
Contributor

@jubrad jubrad commented Apr 20, 2026

Follow up of #35455

Summary

  • Adds BOOTSTRAP BROKER 'addr' USING AWS PRIVATELINK conn (...) — new top-level option that provides the initial bootstrap address with an explicit PrivateLink tunnel, preserving the real hostname for correct TLS SNI
  • Adds MATCHING 'pattern' USING AWS PRIVATELINK conn (...) inside BROKERS (...) — pattern-based routing rules for dynamically discovered brokers returned in Kafka metadata
  • Replaces the AWS PRIVATELINKS syntax which used a TO keyword inconsistent with the existing USING AWS PRIVATELINK syntax and overloaded exact-match patterns as implicit bootstrap addresses
  • Updates parser, planner, and storage layer to support the new constructs

Example

CREATE CONNECTION kafka TO KAFKA (
    BROKERS (
        'lkc-825730.endpoint.cloud:9092' USING AWS PRIVATELINK pl_conn,
        MATCHING '*use1-az1*' USING AWS PRIVATELINK pl_conn (AVAILABILITY ZONE 'use1-az1'),
        MATCHING '*use1-az4*' USING AWS PRIVATELINK pl_conn (AVAILABILITY ZONE 'use1-az4'),
        MATCHING '*use1-az6*' USING AWS PRIVATELINK pl_conn (AVAILABILITY ZONE 'use1-az6')
    ),
    SASL MECHANISMS 'PLAIN',
    SASL USERNAME 'key',
    SASL PASSWORD SECRET secret,
    SECURITY PROTOCOL 'SASL_SSL'
);

Test plan

  • Parser tests updated (roundtrip, error cases)
  • Testdrive updated (connection-create-drop, connection-alter)
  • cargo check passes
  • Cloudtest validation (see follow-up PR)
  • Manual testing against Confluent Cloud PrivateLink

Docs

🤖 Generated with Claude Code

@jubrad
Copy link
Copy Markdown
Contributor Author

jubrad commented Apr 21, 2026

Note on build_tunnel_definition removal from StatementContext:

On main, build_tunnel_definition is a method on StatementContext taking 2 params (ssh_tunnel, aws_privatelink). All 5 callers (Kafka, CSR, Postgres, MySQL, SQL Server) call it via scx.build_tunnel_definition(...).

This PR moves it to a standalone function in connection.rs with a 3rd param (matching_rules) to support the Kafka MATCHING rules path. The non-Kafka callers pass None for matching rules. Since all callers now use the standalone function, the original method on StatementContext has zero callers and is removed.

This is intentional — one function handles all connection types rather than maintaining two parallel implementations.

@jubrad jubrad requested a review from ublubu April 21, 2026 04:21
Comment on lines +340 to +341
/// If true, allow any combination of characters after the literal match.
pub suffix_wildcard: bool,
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we actually need a suffix, if this feature is restricted to confluent, confluent provides a suffix, so we only need prefix wildcard matching. I don't think this hurts, but it's not strictly necessary.

SECURITY PROTOCOL PLAINTEXT
);
contains:must set one of BROKER, BROKERS, or AWS PRIVATELINK
contains:must set one of BROKER, BROKERS, BOOTSTRAP BROKER, or AWS PRIVATELINK
Copy link
Copy Markdown
Contributor Author

@jubrad jubrad Apr 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this makes the syntax for this feature quite clear, but I'm very glad we got rid of the railroad track diagram because the syntax is getting wild.

  • BROKER implies a single broker, it's exclusive with BROKERS, but both set bootstrap broker
  • AWS PRIVATELINK only works when a single top level privatelink is provided with no individual broker matching, although perhaps one could provide BROKER
  • BROKERS is the standard way for mapping brokers to aws privatelink for everything but red panda

I really don't know the best way to land things here.

  1. keep things as they are in this PR
  2. allow broker and brokers where broker is bootstrap and optional, but we error if not static broker is provided
  3. we get rid of bootstrap brokers and require at least one static broker if the user specifies BROKERS

Copy link
Copy Markdown
Contributor

@ublubu ublubu Apr 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the "railroad track diagram"?

Re: We need at least one static broker to bootstrap from.

get rid of BOOTSTRAP BROKER and require at least one static broker in BROKERS

I like this one. It feels clearer than "if you only have MATCHING in BROKERS, you also need a BOOTSTRAP BROKER".

Then BROKER is subsumed by BROKERS and we can just about replace AWS PRIVATELINK with a static broker (static address = PrivateLink address) and a MATCHING '*' rule.
Not saying we remove the other ones, but at least we can focus on just BROKERS in docs, etc.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the "railroad track diagram"?

We used to generate diagrams for all SQL using BNF that we had to maintain along all SQL changes these diagrams would outline the SQL sytnax which, in this case would be super complicated.

get rid of BOOTSTRAP BROKER and require at least one static broker in BROKERS
I like this one

Cool, I'll make this change

@jubrad jubrad marked this pull request as ready for review April 21, 2026 15:57
@jubrad jubrad requested review from a team as code owners April 21, 2026 15:57
@jubrad jubrad requested review from ggevay and removed request for ggevay April 21, 2026 15:57
@jubrad
Copy link
Copy Markdown
Contributor Author

jubrad commented Apr 21, 2026

Copy link
Copy Markdown
Contributor

@def- def- left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mode cockroach

simple conn=mz_system,user=mz_system
ALTER SYSTEM SET max_aws_privatelink_connections = 10;
----
COMPLETE 0

statement ok
CREATE CONNECTION pl TO AWS PRIVATELINK (
    SERVICE NAME 'com.amazonaws.vpce.us-east-1.vpce-svc-0e123abc123198abc',
    AVAILABILITY ZONES ('use1-az1', 'use1-az4')
);

statement error cannot specify BROKER, BOOTSTRAP BROKER, or static BROKERS entries alongside top-level AWS PRIVATELINK
CREATE CONNECTION kafka_bad TO KAFKA (
    BROKER 'kafka:9092',
    AWS PRIVATELINK pl (PORT 9092),
    SECURITY PROTOCOL PLAINTEXT
);

Fails with a panic:

2026-04-22T02:23:43.120459Z ERROR mz_adapter::coord::sequencer::inner: connection validation panicked

Also this one:

mode cockroach

simple conn=mz_system,user=mz_system
ALTER SYSTEM SET max_aws_privatelink_connections = 10;
----
COMPLETE 0

simple conn=mz_system,user=mz_system
ALTER SYSTEM SET enable_kafka_broker_matching_rules = true;
----
COMPLETE 0

statement ok
CREATE CONNECTION pl TO AWS PRIVATELINK (
    SERVICE NAME 'com.amazonaws.vpce.us-east-1.vpce-svc-0e123abc123198abc',
    AVAILABILITY ZONES ('use1-az1', 'use1-az4')
);

statement error BOOTSTRAP BROKER is required when using MATCHING broker rules
CREATE CONNECTION kafka_matching_no_bootstrap TO KAFKA (
    BROKERS (
        MATCHING '*.use1-az1.*' USING AWS PRIVATELINK pl (AVAILABILITY ZONE = 'use1-az1'),
        MATCHING '*.use1-az4.*' USING AWS PRIVATELINK pl (AVAILABILITY ZONE = 'use1-az4')
    ),
    SECURITY PROTOCOL PLAINTEXT
) WITH (VALIDATE = FALSE);

Fails too, we should reject CREATE CONNECTION without a BOOTSTRAP BROKER:

    CREATE CONNECTION kafka_matching_no_bootstrap TO KAFKA (
        BROKERS (
            MATCHING '*.use1-az1.*' USING AWS PRIVATELINK pl (AVAILABILITY ZONE = 'use1-az1'),
            MATCHING '*.use1-az4.*' USING AWS PRIVATELINK pl (AVAILABILITY ZONE = 'use1-az4')
        ),
        SECURITY PROTOCOL PLAINTEXT
    ) WITH (VALIDATE = FALSE);
    UnexpectedPlanSuccess:test/sqllogictest/kafka_matching_requires_bootstrap_broker.

@jubrad jubrad force-pushed the jubrad/kafka-privatelink-bootstrap-broker branch from 924a1ff to a1fc2ad Compare April 22, 2026 03:12
@jubrad jubrad changed the title kafka: add BOOTSTRAP BROKER and MATCHING broker rules for PrivateLink kafka: add MATCHING broker rules for PrivateLink AZ mapping Apr 22, 2026
@jubrad jubrad force-pushed the jubrad/kafka-privatelink-bootstrap-broker branch 2 times, most recently from 98a1e05 to a9e6f38 Compare April 22, 2026 19:17
@jubrad jubrad requested review from a team as code owners April 22, 2026 19:17
@jubrad jubrad requested a review from def- April 22, 2026 19:25
Comment thread src/kafka-util/src/client.rs
Comment thread src/sql-parser/src/ast/defs/statement.rs Outdated
scx,
self.ssh_tunnel,
self.aws_privatelink,
if matching_rules.is_empty() { None } else { Some(matching_rules) },
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting.

Comment thread src/sql/src/plan/statement/ddl/connection.rs Outdated
Comment thread src/sql/src/plan/statement/ddl/connection.rs Outdated
Comment thread src/sql/src/plan/statement/ddl/connection.rs Outdated
Comment thread src/storage-types/src/connections.rs Outdated
Comment thread src/storage-types/src/connections.rs Outdated
@jubrad jubrad force-pushed the jubrad/kafka-privatelink-bootstrap-broker branch from 26a24d1 to 4b4f190 Compare April 23, 2026 01:10
Introduces two new SQL constructs for Kafka PrivateLink connections:

- `BOOTSTRAP BROKER 'addr' USING AWS PRIVATELINK conn (...)` — provides
  the initial bootstrap address with an explicit PrivateLink tunnel. The
  bootstrap address is used as `bootstrap.servers` and the real hostname
  is preserved for correct TLS SNI.

- `MATCHING 'pattern' USING AWS PRIVATELINK conn (...)` inside `BROKERS`
  — pattern-based routing rules for dynamically discovered brokers.
  After the initial metadata fetch, Kafka returns broker addresses that
  may differ from the bootstrap address (e.g., AZ-specific hostnames).
  MATCHING rules route these through the correct PrivateLink endpoint.

This replaces the `AWS PRIVATELINKS` syntax which used exact-match
patterns for dual-purpose bootstrap/routing and a separate `TO` keyword
inconsistent with the existing `USING AWS PRIVATELINK` syntax.

Example:
```sql
CREATE CONNECTION kafka TO KAFKA (
    BOOTSTRAP BROKER 'lkc-825730.endpoint.cloud:9092'
        USING AWS PRIVATELINK pl_conn (AVAILABILITY ZONE 'use1-az1'),
    BROKERS (
        MATCHING '*use1-az1*' USING AWS PRIVATELINK pl_conn (AVAILABILITY ZONE 'use1-az1'),
        MATCHING '*use1-az4*' USING AWS PRIVATELINK pl_conn (AVAILABILITY ZONE 'use1-az4')
    ),
    SASL MECHANISMS 'PLAIN',
    SASL USERNAME 'key',
    SASL PASSWORD SECRET secret,
    SECURITY PROTOCOL 'SASL_SSL'
);
```

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@jubrad jubrad force-pushed the jubrad/kafka-privatelink-bootstrap-broker branch from 4b4f190 to c9e6e0b Compare April 23, 2026 01:33
@jubrad jubrad requested a review from ublubu April 23, 2026 02:04
Copy link
Copy Markdown
Contributor

@def- def- left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor thing, we should reject invalid MATCHING strings:

diff --git a/src/sql-parser/tests/testdata/ddl b/src/sql-parser/tests/testdata/ddl
index ea60378216..265a3608b2 100644
--- a/src/sql-parser/tests/testdata/ddl
+++ b/src/sql-parser/tests/testdata/ddl
@@ -671,6 +671,37 @@ error: Expected USING, found right parenthesis
 CREATE CONNECTION kafka_connection TO KAFKA (BROKERS (MATCHING '*.az.*'))
                                                                        ^

+# Regression: MATCHING patterns must reject internal `*` wildcards. Only a
+# single leading and/or trailing `*` is supported; anything else silently
+# produces a rule that can never match a real broker hostname.
+parse-statement
+CREATE CONNECTION kafka_connection TO KAFKA (BROKERS (MATCHING '*foo*bar*' USING AWS PRIVATELINK privatelink_svc (AVAILABILITY ZONE 'use1-az1')))
+----
+error: pattern may only contain `*` as a leading and/or trailing wildcard
+CREATE CONNECTION kafka_connection TO KAFKA (BROKERS (MATCHING '*foo*bar*' USING AWS PRIVATELINK privatelink_svc (AVAILABILITY ZONE 'use1-az1')))
+                                                                        ^
+
+parse-statement
+CREATE CONNECTION kafka_connection TO KAFKA (BROKERS (MATCHING 'foo*bar' USING AWS PRIVATELINK privatelink_svc (AVAILABILITY ZONE 'use1-az1')))
+----
+error: pattern may only contain `*` as a leading and/or trailing wildcard
+CREATE CONNECTION kafka_connection TO KAFKA (BROKERS (MATCHING 'foo*bar' USING AWS PRIVATELINK privatelink_svc (AVAILABILITY ZONE 'use1-az1')))
+                                                                      ^
+
+parse-statement
+CREATE CONNECTION kafka_connection TO KAFKA (BROKERS (MATCHING '**az1**' USING AWS PRIVATELINK privatelink_svc (AVAILABILITY ZONE 'use1-az1')))
+----
+error: pattern may only contain `*` as a leading and/or trailing wildcard
+CREATE CONNECTION kafka_connection TO KAFKA (BROKERS (MATCHING '**az1**' USING AWS PRIVATELINK privatelink_svc (AVAILABILITY ZONE 'use1-az1')))
+                                                                      ^
+
+parse-statement
+CREATE CONNECTION kafka_connection TO KAFKA (BROKERS (MATCHING 'broker-*-az1-*.host.com' USING AWS PRIVATELINK privatelink_svc (AVAILABILITY ZONE 'use1-az1')))
+----
+error: pattern may only contain `*` as a leading and/or trailing wildcard
+CREATE CONNECTION kafka_connection TO KAFKA (BROKERS (MATCHING 'broker-*-az1-*.host.com' USING AWS PRIVATELINK privatelink_svc (AVAILABILITY ZONE 'use1-az1')))
+                                                                                      ^
+
 parse-statement
 CREATE SOURCE mz_source FROM MYSQL CONNECTION mysqlconn FOR TABLES (foo, bar as qux, baz into zop);
 ----

Runs with cargo test -p mz-sql-parser --test sqlparser_common datadriven

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants