Skip to content

Commit 77a3936

Browse files
committed
feat(helm): add multi-cluster Kafka support with configurable clusters, per-cluster overrides, and enhanced Kerberos and JAAS options
1 parent b92a726 commit 77a3936

8 files changed

Lines changed: 513 additions & 124 deletions

File tree

kafka/README.md

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,3 +30,49 @@ API-family switches:
3030
For production, prefer providing an external Kerberos secret or RWX volume containing `client/krb5.conf` and service keytabs. When using a secret, set `kerberos.existingSecret` and map keys into nested paths with `kerberos.existingSecretItems`, for example `[{key: krb5.conf, path: client/krb5.conf}, {key: kafka-0.keytab, path: keytabs/kafka-0.keytab}]`.
3131

3232
Kerberos clients should use the StatefulSet broker DNS names from the headless service. If exposing Kafka through an Istio TCP route, add a matching `kafka/<external-host>@REALM` service principal and keytab, or override the advertised listener and Kerberos principal pattern accordingly.
33+
34+
## Multi-cluster example
35+
36+
```yaml
37+
kafka:
38+
clusters:
39+
- name: primary
40+
clusterId: MkU3OEVBNTcwNTJENDM2Qk
41+
- name: standby
42+
clusterId: zlFiTJelTOuhnklFwLWixw
43+
44+
schemaRegistry:
45+
kafkaCluster: primary
46+
47+
topicInit:
48+
kafkaCluster: primary
49+
50+
kafbatUi:
51+
autoClusters: true
52+
clusters: []
53+
54+
mirrorMaker:
55+
enabled: true
56+
properties: |
57+
clusters = primary, standby
58+
59+
primary.bootstrap.servers = kafka-kafka-primary-0.kafka-kafka-primary-headless.kafka.svc.cluster.local:9092,kafka-kafka-primary-1.kafka-kafka-primary-headless.kafka.svc.cluster.local:9092
60+
standby.bootstrap.servers = kafka-kafka-standby-0.kafka-kafka-standby-headless.kafka.svc.cluster.local:9092,kafka-kafka-standby-1.kafka-kafka-standby-headless.kafka.svc.cluster.local:9092
61+
62+
primary.security.protocol = SASL_PLAINTEXT
63+
primary.sasl.mechanism = GSSAPI
64+
primary.sasl.kerberos.service.name = kafka
65+
primary.sasl.jaas.config = com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/kerby/keytabs/mm2.keytab" principal="mm2/mm2.example.com@EXAMPLE.COM";
66+
67+
standby.security.protocol = SASL_PLAINTEXT
68+
standby.sasl.mechanism = GSSAPI
69+
standby.sasl.kerberos.service.name = kafka
70+
standby.sasl.jaas.config = com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/kerby/keytabs/mm2.keytab" principal="mm2/mm2.example.com@EXAMPLE.COM";
71+
72+
primary->standby.enabled = true
73+
primary->standby.topics = .*
74+
replication.factor = 1
75+
checkpoints.topic.replication.factor = 1
76+
heartbeats.topic.replication.factor = 1
77+
offset-syncs.topic.replication.factor = 1
78+
```

kafka/templates/_helpers.tpl

Lines changed: 116 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,109 @@ app.kubernetes.io/instance: {{ .Release.Name }}
3838
{{- default (include "kafka.fullname" .) .Values.kafka.service.nameOverride -}}
3939
{{- end -}}
4040

41+
{{- define "kafka.effectiveClusters" -}}
42+
{{- if .Values.kafka.clusters -}}
43+
{{- toYaml .Values.kafka.clusters -}}
44+
{{- else -}}
45+
{{- toYaml (list (dict "name" "")) -}}
46+
{{- end -}}
47+
{{- end -}}
48+
49+
{{- define "kafka.cluster.fullname" -}}
50+
{{- $root := .root -}}
51+
{{- $cluster := .cluster -}}
52+
{{- if $cluster.name -}}
53+
{{- printf "%s-%s" (include "kafka.fullname" $root) $cluster.name | trunc 63 | trimSuffix "-" -}}
54+
{{- else -}}
55+
{{- include "kafka.fullname" $root -}}
56+
{{- end -}}
57+
{{- end -}}
58+
59+
{{- define "kafka.cluster.label" -}}
60+
{{- $cluster := .cluster -}}
61+
{{- default "default" $cluster.name | trunc 63 | trimSuffix "-" -}}
62+
{{- end -}}
63+
64+
{{- define "kafka.cluster.serviceName" -}}
65+
{{- $root := .root -}}
66+
{{- $cluster := .cluster -}}
67+
{{- if and $cluster.service $cluster.service.nameOverride -}}
68+
{{- $cluster.service.nameOverride -}}
69+
{{- else if and (not $cluster.name) $root.Values.kafka.service.nameOverride -}}
70+
{{- $root.Values.kafka.service.nameOverride -}}
71+
{{- else -}}
72+
{{- include "kafka.cluster.fullname" . -}}
73+
{{- end -}}
74+
{{- end -}}
75+
76+
{{- define "kafka.cluster.headlessServiceName" -}}
77+
{{- $root := .root -}}
78+
{{- $cluster := .cluster -}}
79+
{{- if and $cluster.headlessService $cluster.headlessService.nameOverride -}}
80+
{{- $cluster.headlessService.nameOverride -}}
81+
{{- else if and (not $cluster.name) $root.Values.kafka.headlessService.nameOverride -}}
82+
{{- $root.Values.kafka.headlessService.nameOverride -}}
83+
{{- else -}}
84+
{{- printf "%s-headless" (include "kafka.cluster.fullname" .) | trunc 63 | trimSuffix "-" -}}
85+
{{- end -}}
86+
{{- end -}}
87+
88+
{{- define "kafka.cluster.controllerQuorum" -}}
89+
{{- $root := .root -}}
90+
{{- $cluster := .cluster -}}
91+
{{- $fullname := include "kafka.cluster.fullname" . -}}
92+
{{- $headless := include "kafka.cluster.headlessServiceName" . -}}
93+
{{- $namespace := include "kafka.namespace" $root -}}
94+
{{- $domain := include "kafka.clusterDomain" $root -}}
95+
{{- $port := int (default $root.Values.kafka.ports.controller.containerPort (dig "ports" "controller" "containerPort" nil $cluster)) -}}
96+
{{- $replicas := int (default $root.Values.kafka.replicaCount $cluster.replicaCount) -}}
97+
{{- $offset := int (default $root.Values.kafka.brokerIdOffset $cluster.brokerIdOffset) -}}
98+
{{- $items := list -}}
99+
{{- range $i := until $replicas -}}
100+
{{- $items = append $items (printf "%d@%s-%d.%s.%s.svc.%s:%d" (add $i $offset) $fullname $i $headless $namespace $domain $port) -}}
101+
{{- end -}}
102+
{{- join "," $items -}}
103+
{{- end -}}
104+
105+
{{- define "kafka.cluster.bootstrapServers" -}}
106+
{{- $root := .root -}}
107+
{{- $cluster := .cluster -}}
108+
{{- $fullname := include "kafka.cluster.fullname" . -}}
109+
{{- $headless := include "kafka.cluster.headlessServiceName" . -}}
110+
{{- $namespace := include "kafka.namespace" $root -}}
111+
{{- $domain := include "kafka.clusterDomain" $root -}}
112+
{{- $port := int (default $root.Values.kafka.ports.client.containerPort (dig "ports" "client" "containerPort" nil $cluster)) -}}
113+
{{- $replicas := int (default $root.Values.kafka.replicaCount $cluster.replicaCount) -}}
114+
{{- $items := list -}}
115+
{{- range $i := until $replicas -}}
116+
{{- $items = append $items (printf "%s-%d.%s.%s.svc.%s:%d" $fullname $i $headless $namespace $domain $port) -}}
117+
{{- end -}}
118+
{{- join "," $items -}}
119+
{{- end -}}
120+
121+
{{- define "kafka.cluster.bootstrapServersWithProtocol" -}}
122+
{{- $root := .root -}}
123+
{{- $cluster := .cluster -}}
124+
{{- $protocol := default $root.Values.kafka.interBrokerListenerName $cluster.interBrokerListenerName -}}
125+
{{- $items := list -}}
126+
{{- range $server := splitList "," (include "kafka.cluster.bootstrapServers" .) -}}
127+
{{- $items = append $items (printf "%s://%s" $protocol $server) -}}
128+
{{- end -}}
129+
{{- join "," $items -}}
130+
{{- end -}}
131+
132+
{{- define "kafka.cluster.keytabPattern" -}}
133+
{{- $root := .root -}}
134+
{{- $cluster := .cluster -}}
135+
{{- if and $cluster.kerberos $cluster.kerberos.keytabPattern -}}
136+
{{- $cluster.kerberos.keytabPattern -}}
137+
{{- else if $cluster.name -}}
138+
{{- printf "%s/keytabs/kafka-%s-%%d.keytab" $root.Values.kerberos.mountPath $cluster.name -}}
139+
{{- else -}}
140+
{{- $root.Values.kafka.kerberos.keytabPattern -}}
141+
{{- end -}}
142+
{{- end -}}
143+
41144
{{- define "kafka.kerberosSecretName" -}}
42145
{{- default (printf "%s-kerberos" (include "kafka.fullname" .)) .Values.kerberos.existingSecret -}}
43146
{{- end -}}
@@ -56,38 +159,31 @@ app.kubernetes.io/instance: {{ .Release.Name }}
56159
{{- end -}}
57160

58161
{{- define "kafka.bootstrapServers" -}}
59-
{{- $fullname := include "kafka.fullname" . -}}
60-
{{- $headless := include "kafka.headlessServiceName" . -}}
61-
{{- $namespace := include "kafka.namespace" . -}}
62-
{{- $domain := include "kafka.clusterDomain" . -}}
63-
{{- $port := int .Values.kafka.ports.client.containerPort -}}
64-
{{- $items := list -}}
65-
{{- range $i := until (int .Values.kafka.replicaCount) -}}
66-
{{- $items = append $items (printf "%s-%d.%s.%s.svc.%s:%d" $fullname $i $headless $namespace $domain $port) -}}
67-
{{- end -}}
68-
{{- join "," $items -}}
162+
{{- $cluster := first (include "kafka.effectiveClusters" . | fromYamlArray) -}}
163+
{{- include "kafka.cluster.bootstrapServers" (dict "root" . "cluster" $cluster) -}}
69164
{{- end -}}
70165

71166
{{- define "kafka.bootstrapServersWithProtocol" -}}
72-
{{- $protocol := .Values.kafka.interBrokerListenerName -}}
73-
{{- $items := list -}}
74-
{{- range $server := splitList "," (include "kafka.bootstrapServers" .) -}}
75-
{{- $items = append $items (printf "%s://%s" $protocol $server) -}}
76-
{{- end -}}
77-
{{- join "," $items -}}
167+
{{- $cluster := first (include "kafka.effectiveClusters" . | fromYamlArray) -}}
168+
{{- include "kafka.cluster.bootstrapServersWithProtocol" (dict "root" . "cluster" $cluster) -}}
78169
{{- end -}}
79170

80171
{{- define "kafka.kdcExtraServicePrincipals" -}}
81-
{{- $fullname := include "kafka.fullname" . -}}
82-
{{- $headless := include "kafka.headlessServiceName" . -}}
172+
{{- $root := . -}}
83173
{{- $namespace := include "kafka.namespace" . -}}
84174
{{- $domain := include "kafka.clusterDomain" . -}}
85175
{{- $realm := .Values.kerberos.realm -}}
86176
{{- $items := list -}}
87177
{{- if .Values.kerberos.generateKafkaPrincipals -}}
88-
{{- range $i := until (int .Values.kafka.replicaCount) -}}
178+
{{- range $cluster := (include "kafka.effectiveClusters" . | fromYamlArray) -}}
179+
{{- $fullname := include "kafka.cluster.fullname" (dict "root" $root "cluster" $cluster) -}}
180+
{{- $headless := include "kafka.cluster.headlessServiceName" (dict "root" $root "cluster" $cluster) -}}
181+
{{- $replicas := int (default $root.Values.kafka.replicaCount $cluster.replicaCount) -}}
182+
{{- range $i := until $replicas -}}
89183
{{- $fqdn := printf "%s-%d.%s.%s.svc.%s" $fullname $i $headless $namespace $domain -}}
90-
{{- $items = append $items (printf "kafka/%s@%s:%s/kafka-%d.keytab" $fqdn $realm $.Values.kerberos.keytabsDir $i) -}}
184+
{{- $keytabName := ternary (printf "kafka-%s-%d.keytab" $cluster.name $i) (printf "kafka-%d.keytab" $i) (ne (default "" $cluster.name) "") -}}
185+
{{- $items = append $items (printf "kafka/%s@%s:%s/%s" $fqdn $realm $root.Values.kerberos.keytabsDir $keytabName) -}}
186+
{{- end -}}
91187
{{- end -}}
92188
{{- end -}}
93189
{{- range .Values.kerberos.extraServicePrincipals -}}

kafka/templates/kafbat-ui.yaml

Lines changed: 44 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -53,28 +53,62 @@ spec:
5353
value: "-Djava.security.krb5.conf={{ .Values.kerberos.krb5ConfigPath }}"
5454
- name: DYNAMIC_CONFIG_ENABLED
5555
value: {{ .Values.kafbatUi.dynamicConfigEnabled | quote }}
56-
{{- range $i, $cluster := .Values.kafbatUi.clusters }}
57-
- name: KAFKA_CLUSTERS_{{ $i }}_NAME
56+
{{- $kafkaClusters := include "kafka.effectiveClusters" . | fromYamlArray }}
57+
{{- $idx := 0 }}
58+
{{- if .Values.kafbatUi.autoClusters }}
59+
{{- range $kafkaCluster := $kafkaClusters }}
60+
- name: KAFKA_CLUSTERS_{{ $idx }}_NAME
61+
value: {{ default "default" $kafkaCluster.name | quote }}
62+
- name: KAFKA_CLUSTERS_{{ $idx }}_BOOTSTRAPSERVERS
63+
value: {{ include "kafka.cluster.bootstrapServers" (dict "root" $ "cluster" $kafkaCluster) | quote }}
64+
{{- if $.Values.schemaRegistry.enabled }}
65+
- name: KAFKA_CLUSTERS_{{ $idx }}_SCHEMAREGISTRY
66+
value: {{ printf "http://%s-schema-registry:%v" (include "kafka.fullname" $) $.Values.schemaRegistry.service.port | quote }}
67+
{{- end }}
68+
{{- if and $.Values.kerberos.enabled $.Values.kafbatUi.kerberos.enabled }}
69+
- name: KAFKA_CLUSTERS_{{ $idx }}_PROPERTIES_SECURITY_PROTOCOL
70+
value: SASL_PLAINTEXT
71+
- name: KAFKA_CLUSTERS_{{ $idx }}_PROPERTIES_SASL_MECHANISM
72+
value: GSSAPI
73+
- name: KAFKA_CLUSTERS_{{ $idx }}_PROPERTIES_SASL_KERBEROS_SERVICE_NAME
74+
value: {{ $.Values.kafka.saslKerberosServiceName | quote }}
75+
- name: KAFKA_CLUSTERS_{{ $idx }}_PROPERTIES_SASL_JAAS_CONFIG
76+
value: {{ printf "com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"%s\" principal=\"%s\";" $.Values.kafbatUi.kerberos.keytab $.Values.kafbatUi.kerberos.principal | quote }}
77+
{{- end }}
78+
{{- $idx = add1 $idx }}
79+
{{- end }}
80+
{{- end }}
81+
{{- range $cluster := .Values.kafbatUi.clusters }}
82+
{{- $targetCluster := first $kafkaClusters }}
83+
{{- if $cluster.kafkaCluster }}
84+
{{- range $candidate := $kafkaClusters }}
85+
{{- if eq $candidate.name $cluster.kafkaCluster }}
86+
{{- $targetCluster = $candidate }}
87+
{{- end }}
88+
{{- end }}
89+
{{- end }}
90+
- name: KAFKA_CLUSTERS_{{ $idx }}_NAME
5891
value: {{ $cluster.name | quote }}
59-
- name: KAFKA_CLUSTERS_{{ $i }}_BOOTSTRAPSERVERS
60-
value: {{ default (include "kafka.bootstrapServers" $) $cluster.bootstrapServers | quote }}
92+
- name: KAFKA_CLUSTERS_{{ $idx }}_BOOTSTRAPSERVERS
93+
value: {{ default (include "kafka.cluster.bootstrapServers" (dict "root" $ "cluster" $targetCluster)) $cluster.bootstrapServers | quote }}
6194
{{- if $cluster.schemaRegistry }}
62-
- name: KAFKA_CLUSTERS_{{ $i }}_SCHEMAREGISTRY
95+
- name: KAFKA_CLUSTERS_{{ $idx }}_SCHEMAREGISTRY
6396
value: {{ $cluster.schemaRegistry | quote }}
6497
{{- else if $.Values.schemaRegistry.enabled }}
65-
- name: KAFKA_CLUSTERS_{{ $i }}_SCHEMAREGISTRY
98+
- name: KAFKA_CLUSTERS_{{ $idx }}_SCHEMAREGISTRY
6699
value: {{ printf "http://%s-schema-registry:%v" (include "kafka.fullname" $) $.Values.schemaRegistry.service.port | quote }}
67100
{{- end }}
68101
{{- if and $.Values.kerberos.enabled $.Values.kafbatUi.kerberos.enabled }}
69-
- name: KAFKA_CLUSTERS_{{ $i }}_PROPERTIES_SECURITY_PROTOCOL
102+
- name: KAFKA_CLUSTERS_{{ $idx }}_PROPERTIES_SECURITY_PROTOCOL
70103
value: SASL_PLAINTEXT
71-
- name: KAFKA_CLUSTERS_{{ $i }}_PROPERTIES_SASL_MECHANISM
104+
- name: KAFKA_CLUSTERS_{{ $idx }}_PROPERTIES_SASL_MECHANISM
72105
value: GSSAPI
73-
- name: KAFKA_CLUSTERS_{{ $i }}_PROPERTIES_SASL_KERBEROS_SERVICE_NAME
106+
- name: KAFKA_CLUSTERS_{{ $idx }}_PROPERTIES_SASL_KERBEROS_SERVICE_NAME
74107
value: {{ $.Values.kafka.saslKerberosServiceName | quote }}
75-
- name: KAFKA_CLUSTERS_{{ $i }}_PROPERTIES_SASL_JAAS_CONFIG
108+
- name: KAFKA_CLUSTERS_{{ $idx }}_PROPERTIES_SASL_JAAS_CONFIG
76109
value: {{ printf "com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"%s\" principal=\"%s\";" $.Values.kafbatUi.kerberos.keytab $.Values.kafbatUi.kerberos.principal | quote }}
77110
{{- end }}
111+
{{- $idx = add1 $idx }}
78112
{{- end }}
79113
{{- range $name, $value := .Values.kafbatUi.env }}
80114
- name: {{ $name }}

0 commit comments

Comments
 (0)