Skip to content

Commit fae58c8

Browse files
committed
feat(helm): add support for per-broker LoadBalancer services and external access configurations in Kafka
1 parent b6099d8 commit fae58c8

5 files changed

Lines changed: 195 additions & 12 deletions

File tree

kafka/README.md

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,40 @@ kafka:
4343
4444
With persistence disabled, the broker data path is still mounted, but it uses `emptyDir` and is lost when the pod is deleted.
4545

46+
## Per-broker LoadBalancer services
47+
48+
Kafka external access needs one stable address per broker. Enable per-broker LoadBalancer Services and make each broker advertise the matching external IP or DNS name:
49+
50+
```yaml
51+
kafka:
52+
externalBrokerServices:
53+
enabled: true
54+
advertisedHosts:
55+
- 203.0.113.10
56+
- 203.0.113.11
57+
```
58+
59+
For multi-cluster mode, configure the list per cluster:
60+
61+
```yaml
62+
kafka:
63+
clusters:
64+
- name: primary
65+
clusterId: MkU3OEVBNTcwNTJENDM2Qk
66+
externalBrokerServices:
67+
advertisedHosts:
68+
- 203.0.113.10
69+
- 203.0.113.11
70+
- name: standby
71+
clusterId: zlFiTJelTOuhnklFwLWixw
72+
externalBrokerServices:
73+
advertisedHosts:
74+
- 203.0.113.12
75+
- 203.0.113.13
76+
```
77+
78+
If your cloud provider assigns IPs dynamically, install once with external services enabled, read the assigned addresses, put those addresses into `advertisedHosts`, then run `helm upgrade`. For production, prefer reserving static IPs and setting `loadBalancerIPs` plus matching `advertisedHosts` up front when your provider supports `loadBalancerIP`.
79+
4680
## User-managed SASL secrets
4781

4882
For SASL/PLAIN, keep credentials in Kubernetes Secrets and point the chart at those secrets instead of putting JAAS strings in values files.

kafka/templates/NOTES.txt

Lines changed: 70 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,74 @@
11
Kafka has been installed as {{ include "kafka.fullname" . }}.
22

3-
Bootstrap service:
4-
{{ include "kafka.serviceName" . }}:{{ .Values.kafka.ports.client.servicePort }}
5-
6-
Broker DNS names:
7-
{{- $fullname := include "kafka.fullname" . }}
8-
{{- $headless := include "kafka.headlessServiceName" . }}
3+
{{- $root := . }}
94
{{- $namespace := include "kafka.namespace" . }}
10-
{{- $domain := include "kafka.clusterDomain" . }}
11-
{{- range $i := until (int .Values.kafka.replicaCount) }}
12-
{{ $fullname }}-{{ $i }}.{{ $headless }}.{{ $namespace }}.svc.{{ $domain }}:{{ $.Values.kafka.ports.client.servicePort }}
5+
{{- $clusters := include "kafka.effectiveClusters" . | fromYamlArray }}
6+
7+
Internal bootstrap endpoints:
8+
{{- range $cluster := $clusters }}
9+
{{- $ctx := dict "root" $root "cluster" $cluster }}
10+
{{- $clusterName := default "default" $cluster.name }}
11+
{{ $clusterName }}:
12+
{{ include "kafka.cluster.bootstrapServers" $ctx }}
13+
{{- end }}
14+
15+
{{- $globalExternal := .Values.kafka.externalBrokerServices.enabled }}
16+
{{- $hasExternal := false }}
17+
{{- range $cluster := $clusters }}
18+
{{- if (default $globalExternal (dig "externalBrokerServices" "enabled" nil $cluster)) }}
19+
{{- $hasExternal = true }}
20+
{{- end }}
21+
{{- end }}
22+
23+
{{- if $hasExternal }}
24+
25+
External broker LoadBalancer services:
26+
{{- range $cluster := $clusters }}
27+
{{- $ctx := dict "root" $root "cluster" $cluster }}
28+
{{- $clusterFullname := include "kafka.cluster.fullname" $ctx }}
29+
{{- $clusterName := default "default" $cluster.name }}
30+
{{- $replicas := int (default $root.Values.kafka.replicaCount $cluster.replicaCount) }}
31+
{{- if (default $globalExternal (dig "externalBrokerServices" "enabled" nil $cluster)) }}
32+
{{ $clusterName }}:
33+
{{- range $i := until $replicas }}
34+
{{ printf "%s-%d-external" $clusterFullname $i | trunc 63 | trimSuffix "-" }}
35+
{{- end }}
36+
{{- end }}
37+
{{- end }}
38+
39+
Check assigned LoadBalancer IPs/hostnames:
40+
kubectl -n {{ $namespace }} get svc -l app.kubernetes.io/instance={{ .Release.Name }},app.kubernetes.io/component=broker -o wide
41+
42+
If your LoadBalancer IPs were assigned dynamically, copy this upgrade command after the
43+
external services show an EXTERNAL-IP/hostname. It updates Kafka advertised.listeners
44+
to match the assigned per-broker addresses.
45+
46+
{{- if .Values.kafka.clusters }}
47+
helm upgrade {{ .Release.Name }} ./kafka -n {{ $namespace }} -f <your-values.yaml> \
48+
{{- range $clusterIndex, $cluster := $clusters }}
49+
{{- $ctx := dict "root" $root "cluster" $cluster }}
50+
{{- $clusterFullname := include "kafka.cluster.fullname" $ctx }}
51+
{{- $replicas := int (default $root.Values.kafka.replicaCount $cluster.replicaCount) }}
52+
{{- if (default $globalExternal (dig "externalBrokerServices" "enabled" nil $cluster)) }}
53+
{{- range $i := until $replicas }}
54+
--set-string 'kafka.clusters[{{ $clusterIndex }}].externalBrokerServices.advertisedHosts[{{ $i }}]'="$(kubectl -n {{ $namespace }} get svc {{ printf "%s-%d-external" $clusterFullname $i | trunc 63 | trimSuffix "-" }} -o jsonpath='{.status.loadBalancer.ingress[0].ip}{.status.loadBalancer.ingress[0].hostname}')" \
55+
{{- end }}
56+
{{- end }}
57+
{{- end }}
58+
--reuse-values
59+
{{- else }}
60+
{{- $cluster := first $clusters }}
61+
{{- $ctx := dict "root" $root "cluster" $cluster }}
62+
{{- $clusterFullname := include "kafka.cluster.fullname" $ctx }}
63+
{{- $replicas := int (default $root.Values.kafka.replicaCount $cluster.replicaCount) }}
64+
helm upgrade {{ .Release.Name }} ./kafka -n {{ $namespace }} -f <your-values.yaml> \
65+
{{- range $i := until $replicas }}
66+
--set-string 'kafka.externalBrokerServices.advertisedHosts[{{ $i }}]'="$(kubectl -n {{ $namespace }} get svc {{ printf "%s-%d-external" $clusterFullname $i | trunc 63 | trimSuffix "-" }} -o jsonpath='{.status.loadBalancer.ingress[0].ip}{.status.loadBalancer.ingress[0].hostname}')" \
67+
{{- end }}
68+
--reuse-values
69+
{{- end }}
70+
71+
For production, prefer reserving static LoadBalancer IPs and setting both
72+
externalBrokerServices.loadBalancerIPs and externalBrokerServices.advertisedHosts
73+
before the first install.
1374
{{- end }}

kafka/templates/kafka-statefulset.yaml

Lines changed: 67 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,17 @@
1515
{{- $clientPort := int (default $root.Values.kafka.ports.client.containerPort (dig "ports" "client" "containerPort" nil $cluster)) -}}
1616
{{- $clientServicePort := int (default $root.Values.kafka.ports.client.servicePort (dig "ports" "client" "servicePort" nil $cluster)) -}}
1717
{{- $clientProtocol := default $root.Values.kafka.ports.client.protocol (dig "ports" "client" "protocol" nil $cluster) -}}
18+
{{- $externalServicesEnabled := default $root.Values.kafka.externalBrokerServices.enabled (dig "externalBrokerServices" "enabled" nil $cluster) -}}
19+
{{- $externalServiceType := default $root.Values.kafka.externalBrokerServices.type (dig "externalBrokerServices" "type" nil $cluster) -}}
20+
{{- $externalListenerName := default $root.Values.kafka.externalBrokerServices.listenerName (dig "externalBrokerServices" "listenerName" nil $cluster) -}}
21+
{{- $externalContainerPort := int (default $root.Values.kafka.externalBrokerServices.containerPort (dig "externalBrokerServices" "containerPort" nil $cluster)) -}}
22+
{{- $externalServicePort := int (default $root.Values.kafka.externalBrokerServices.servicePort (dig "externalBrokerServices" "servicePort" nil $cluster)) -}}
23+
{{- $externalProtocol := default $root.Values.kafka.externalBrokerServices.protocol (dig "externalBrokerServices" "protocol" nil $cluster) -}}
24+
{{- $externalAnnotations := default $root.Values.kafka.externalBrokerServices.annotations (dig "externalBrokerServices" "annotations" nil $cluster) -}}
25+
{{- $externalTrafficPolicy := default $root.Values.kafka.externalBrokerServices.externalTrafficPolicy (dig "externalBrokerServices" "externalTrafficPolicy" nil $cluster) -}}
26+
{{- $externalSourceRanges := default $root.Values.kafka.externalBrokerServices.loadBalancerSourceRanges (dig "externalBrokerServices" "loadBalancerSourceRanges" nil $cluster) -}}
27+
{{- $externalLBIPs := default $root.Values.kafka.externalBrokerServices.loadBalancerIPs (dig "externalBrokerServices" "loadBalancerIPs" nil $cluster) -}}
28+
{{- $externalAdvertisedHosts := default $root.Values.kafka.externalBrokerServices.advertisedHosts (dig "externalBrokerServices" "advertisedHosts" nil $cluster) -}}
1829
{{- $controllerName := default $root.Values.kafka.ports.controller.name (dig "ports" "controller" "name" nil $cluster) -}}
1930
{{- $controllerPort := int (default $root.Values.kafka.ports.controller.containerPort (dig "ports" "controller" "containerPort" nil $cluster)) -}}
2031
{{- $controllerServicePort := int (default $root.Values.kafka.ports.controller.servicePort (dig "ports" "controller" "servicePort" nil $cluster)) -}}
@@ -80,6 +91,46 @@ spec:
8091
port: {{ $jmxServicePort }}
8192
targetPort: {{ $jmxName }}
8293
protocol: {{ $jmxProtocol }}
94+
{{- if $externalServicesEnabled }}
95+
{{- range $i := until $replicas }}
96+
---
97+
apiVersion: v1
98+
kind: Service
99+
metadata:
100+
name: {{ printf "%s-%d-external" $clusterName $i | trunc 63 | trimSuffix "-" }}
101+
labels:
102+
{{- include "kafka.labels" $root | nindent 4 }}
103+
app.kubernetes.io/component: broker
104+
app.kubernetes.io/part-of: {{ $clusterLabel }}
105+
statefulset.kubernetes.io/pod-name: {{ printf "%s-%d" $clusterName $i }}
106+
{{- with $externalAnnotations }}
107+
annotations:
108+
{{- toYaml . | nindent 4 }}
109+
{{- end }}
110+
spec:
111+
type: {{ $externalServiceType }}
112+
{{- with $externalTrafficPolicy }}
113+
externalTrafficPolicy: {{ . }}
114+
{{- end }}
115+
{{- with $externalSourceRanges }}
116+
loadBalancerSourceRanges:
117+
{{- toYaml . | nindent 4 }}
118+
{{- end }}
119+
{{- if lt $i (len $externalLBIPs) }}
120+
loadBalancerIP: {{ index $externalLBIPs $i | quote }}
121+
{{- end }}
122+
selector:
123+
{{- include "kafka.selectorLabels" $root | nindent 4 }}
124+
app.kubernetes.io/component: broker
125+
app.kubernetes.io/part-of: {{ $clusterLabel }}
126+
statefulset.kubernetes.io/pod-name: {{ printf "%s-%d" $clusterName $i }}
127+
ports:
128+
- name: {{ lower $externalListenerName | trunc 15 | trimSuffix "-" }}
129+
port: {{ $externalServicePort }}
130+
targetPort: {{ lower $externalListenerName | trunc 15 | trimSuffix "-" }}
131+
protocol: {{ $externalProtocol }}
132+
{{- end }}
133+
{{- end }}
83134
---
84135
apiVersion: apps/v1
85136
kind: StatefulSet
@@ -141,9 +192,17 @@ spec:
141192
export KAFKA_PROCESS_ROLES="{{ default $root.Values.kafka.processRoles $cluster.processRoles }}"
142193
export KAFKA_CONTROLLER_QUORUM_VOTERS="{{ include "kafka.cluster.controllerQuorum" $ctx }}"
143194
export KAFKA_CONTROLLER_LISTENER_NAMES="CONTROLLER"
144-
export KAFKA_LISTENER_SECURITY_PROTOCOL_MAP="{{ default $root.Values.kafka.listenerSecurityProtocolMap $cluster.listenerSecurityProtocolMap }}"
145-
export KAFKA_LISTENERS="SASL_PLAINTEXT://0.0.0.0:{{ $clientPort }},CONTROLLER://${POD_FQDN}:{{ $controllerPort }}"
146-
export KAFKA_ADVERTISED_LISTENERS="SASL_PLAINTEXT://${POD_FQDN}:{{ $clientPort }}"
195+
export KAFKA_LISTENER_SECURITY_PROTOCOL_MAP="{{ default $root.Values.kafka.listenerSecurityProtocolMap $cluster.listenerSecurityProtocolMap }}{{- if $externalServicesEnabled }},{{ $externalListenerName }}:SASL_PLAINTEXT{{- end }}"
196+
{{- if $externalServicesEnabled }}
197+
EXTERNAL_ADVERTISED_HOSTS="{{ join "," $externalAdvertisedHosts }}"
198+
EXTERNAL_HOST="$(echo "${EXTERNAL_ADVERTISED_HOSTS}" | cut -d, -f$((ORDINAL + 1)))"
199+
if [ -z "${EXTERNAL_HOST}" ]; then
200+
echo "Missing kafka.externalBrokerServices.advertisedHosts entry for ${HOSTNAME}" >&2
201+
exit 1
202+
fi
203+
{{- end }}
204+
export KAFKA_LISTENERS="SASL_PLAINTEXT://0.0.0.0:{{ $clientPort }}{{- if $externalServicesEnabled }},{{ $externalListenerName }}://0.0.0.0:{{ $externalContainerPort }}{{- end }},CONTROLLER://${POD_FQDN}:{{ $controllerPort }}"
205+
export KAFKA_ADVERTISED_LISTENERS="SASL_PLAINTEXT://${POD_FQDN}:{{ $clientPort }}{{- if $externalServicesEnabled }},{{ $externalListenerName }}://${EXTERNAL_HOST}:{{ $externalServicePort }}{{- end }}"
147206
export KAFKA_INTER_BROKER_LISTENER_NAME="{{ default $root.Values.kafka.interBrokerListenerName $cluster.interBrokerListenerName }}"
148207
export KAFKA_SASL_ENABLED_MECHANISMS="{{ default $root.Values.kafka.saslEnabledMechanisms $cluster.saslEnabledMechanisms }}"
149208
export KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL="{{ default $root.Values.kafka.saslMechanismInterBrokerProtocol $cluster.saslMechanismInterBrokerProtocol }}"
@@ -190,6 +249,11 @@ spec:
190249
- name: {{ $jmxName }}
191250
containerPort: {{ $jmxPort }}
192251
protocol: {{ $jmxProtocol }}
252+
{{- if $externalServicesEnabled }}
253+
- name: {{ lower $externalListenerName | trunc 15 | trimSuffix "-" }}
254+
containerPort: {{ $externalContainerPort }}
255+
protocol: {{ $externalProtocol }}
256+
{{- end }}
193257
env:
194258
- name: KAFKA_SUPER_USERS
195259
value: {{ join ";" (default $root.Values.kafka.superUsers $cluster.superUsers) | quote }}

kafka/values-sample.yaml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,18 @@ kafka:
2929
- name: standby
3030
clusterId: zlFiTJelTOuhnklFwLWixw
3131
replicaCount: 2
32+
externalBrokerServices:
33+
enabled: true
34+
type: LoadBalancer
35+
# annotations: {}
36+
# externalTrafficPolicy: ""
37+
# loadBalancerSourceRanges: []
38+
# loadBalancerIPs: []
39+
# advertisedHosts: []
40+
# servicePort: 9092
41+
# containerPort: 19092
42+
# protocol: TCP
43+
# listenerName: EXTERNAL
3244

3345
schemaRegistry:
3446
enabled: true

kafka/values.yaml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,18 @@ kafka:
8787
nameOverride: ""
8888
type: ClusterIP
8989
annotations: {}
90+
externalBrokerServices:
91+
enabled: false
92+
type: LoadBalancer
93+
annotations: {}
94+
externalTrafficPolicy: ""
95+
loadBalancerSourceRanges: []
96+
loadBalancerIPs: []
97+
advertisedHosts: []
98+
servicePort: 9092
99+
containerPort: 19092
100+
protocol: TCP
101+
listenerName: EXTERNAL
90102
headlessService:
91103
nameOverride: ""
92104
annotations: {}

0 commit comments

Comments
 (0)