Skip to content

Commit 73ec41e

Browse files
committed
feat(kafka): support of Apache Kafka Docker Hub images (apache/kafka and apache/kafka-native).
Signed-off-by: Marat Abrarov <abrarov@gmail.com>
1 parent ec51c67 commit 73ec41e

3 files changed

Lines changed: 273 additions & 78 deletions

File tree

modules/kafka/examples_test.go

Lines changed: 63 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,7 @@ import (
99
"github.com/testcontainers/testcontainers-go/modules/kafka"
1010
)
1111

12-
func ExampleRun() {
13-
// runKafkaContainer {
12+
func ExampleRun_confluentLocal() {
1413
ctx := context.Background()
1514

1615
kafkaContainer, err := kafka.Run(ctx,
@@ -26,7 +25,68 @@ func ExampleRun() {
2625
log.Printf("failed to start container: %s", err)
2726
return
2827
}
29-
// }
28+
29+
state, err := kafkaContainer.State(ctx)
30+
if err != nil {
31+
log.Printf("failed to get container state: %s", err)
32+
return
33+
}
34+
35+
fmt.Println(kafkaContainer.ClusterID)
36+
fmt.Println(state.Running)
37+
38+
// Output:
39+
// test-cluster
40+
// true
41+
}
42+
43+
func ExampleRun_apacheKafka() {
44+
ctx := context.Background()
45+
46+
kafkaContainer, err := kafka.Run(ctx,
47+
"apache/kafka:4.0.1",
48+
kafka.WithClusterID("test-cluster"),
49+
)
50+
defer func() {
51+
if err := testcontainers.TerminateContainer(kafkaContainer); err != nil {
52+
log.Printf("failed to terminate container: %s", err)
53+
}
54+
}()
55+
if err != nil {
56+
log.Printf("failed to start container: %s", err)
57+
return
58+
}
59+
60+
state, err := kafkaContainer.State(ctx)
61+
if err != nil {
62+
log.Printf("failed to get container state: %s", err)
63+
return
64+
}
65+
66+
fmt.Println(kafkaContainer.ClusterID)
67+
fmt.Println(state.Running)
68+
69+
// Output:
70+
// test-cluster
71+
// true
72+
}
73+
74+
func ExampleRun_apacheKafkaNative() {
75+
ctx := context.Background()
76+
77+
kafkaContainer, err := kafka.Run(ctx,
78+
"apache/kafka-native:4.0.1",
79+
kafka.WithClusterID("test-cluster"),
80+
)
81+
defer func() {
82+
if err := testcontainers.TerminateContainer(kafkaContainer); err != nil {
83+
log.Printf("failed to terminate container: %s", err)
84+
}
85+
}()
86+
if err != nil {
87+
log.Printf("failed to start container: %s", err)
88+
return
89+
}
3090

3191
state, err := kafkaContainer.State(ctx)
3292
if err != nil {

modules/kafka/kafka.go

Lines changed: 91 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -5,31 +5,36 @@ import (
55
"errors"
66
"fmt"
77
"math"
8+
"net"
89
"strconv"
910
"strings"
1011

1112
"github.com/docker/go-connections/nat"
1213
"golang.org/x/mod/semver"
1314

1415
"github.com/testcontainers/testcontainers-go"
16+
"github.com/testcontainers/testcontainers-go/log"
1517
"github.com/testcontainers/testcontainers-go/wait"
1618
)
1719

18-
const publicPort = nat.Port("9093/tcp")
1920
const (
20-
starterScript = "/usr/sbin/testcontainers_start.sh"
21-
22-
// starterScript {
23-
starterScriptContent = `#!/bin/bash
24-
source /etc/confluent/docker/bash-config
25-
export KAFKA_ADVERTISED_LISTENERS=%s,BROKER://%s:9092
26-
echo Starting Kafka KRaft mode
27-
sed -i '/KAFKA_ZOOKEEPER_CONNECT/d' /etc/confluent/docker/configure
28-
echo 'kafka-storage format --ignore-formatted -t "$(kafka-storage random-uuid)" -c /etc/kafka/kafka.properties' >> /etc/confluent/docker/configure
29-
echo '' > /etc/confluent/docker/ensure
30-
/etc/confluent/docker/configure
31-
/etc/confluent/docker/launch`
32-
// }
21+
controllerListenerLocalPort = 9094
22+
publicListenerLocalPort = 9093
23+
hostnameListenerLocalPort = 9092
24+
localhostListenerLocalPort = 9095
25+
starterScript = "/usr/sbin/testcontainers_start.sh"
26+
starterScriptContent = `#!/bin/bash
27+
export KAFKA_ADVERTISED_LISTENERS='%[2]s,BROKER://%[3]s,LOCALHOST://localhost:%[4]d'
28+
# For confluentinc/confluent-local image only
29+
if [ -d /etc/confluent/docker ]; then
30+
export KAFKA_REST_BOOTSTRAP_SERVERS="${KAFKA_LISTENERS}"
31+
sed -i '/KAFKA_ZOOKEEPER_CONNECT/d' /etc/confluent/docker/configure
32+
echo 'kafka-storage format --ignore-formatted -t "$(kafka-storage random-uuid)" -c /etc/kafka/kafka.properties' >> /etc/confluent/docker/configure
33+
echo '' > /etc/confluent/docker/ensure
34+
fi
35+
# Run original container entrypoint and command
36+
exec %[1]s
37+
`
3338
)
3439

3540
// KafkaContainer represents the Kafka container type used in the module
@@ -46,17 +51,30 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize
4651

4752
// Run creates an instance of the Kafka container type
4853
func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustomizer) (*KafkaContainer, error) {
54+
publicPort, err := nat.NewPort("tcp", strconv.Itoa(publicListenerLocalPort))
55+
if err != nil {
56+
return nil, fmt.Errorf("nat.NewPort: %w", err)
57+
}
58+
59+
dockerProvider, err := getDockerProvider(opts...)
60+
if err != nil {
61+
return nil, fmt.Errorf("getDockerProvider: %w", err)
62+
}
63+
4964
if err := validateKRaftVersion(img); err != nil {
5065
return nil, err
5166
}
5267

68+
kafkaListeners := fmt.Sprintf("PLAINTEXT://:%d,BROKER://:%d,CONTROLLER://:%d,LOCALHOST://localhost:%d",
69+
publicListenerLocalPort, hostnameListenerLocalPort, controllerListenerLocalPort, localhostListenerLocalPort)
70+
5371
moduleOpts := []testcontainers.ContainerCustomizer{
5472
testcontainers.WithExposedPorts(string(publicPort)),
5573
testcontainers.WithEnv(map[string]string{
5674
// envVars {
57-
"KAFKA_LISTENERS": "PLAINTEXT://0.0.0.0:9093,BROKER://0.0.0.0:9092,CONTROLLER://0.0.0.0:9094",
58-
"KAFKA_REST_BOOTSTRAP_SERVERS": "PLAINTEXT://0.0.0.0:9093,BROKER://0.0.0.0:9092,CONTROLLER://0.0.0.0:9094",
59-
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP": "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT",
75+
"KAFKA_LISTENERS": kafkaListeners,
76+
"KAFKA_REST_BOOTSTRAP_SERVERS": kafkaListeners,
77+
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP": "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT,LOCALHOST:PLAINTEXT",
6078
"KAFKA_INTER_BROKER_LISTENER_NAME": "BROKER",
6179
"KAFKA_BROKER_ID": "1",
6280
"KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR": "1",
@@ -72,15 +90,15 @@ func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustom
7290
}),
7391
testcontainers.WithEntrypoint("sh"),
7492
// this CMD will wait for the starter script to be copied into the container and then execute it
75-
testcontainers.WithCmd("-c", "while [ ! -f "+starterScript+" ]; do sleep 0.1; done; bash "+starterScript),
93+
testcontainers.WithCmd("-c", fmt.Sprintf("while [ ! -f %[1]q ]; do sleep 0.1; done; exec %[1]q", starterScript)),
7694
testcontainers.WithLifecycleHooks(testcontainers.ContainerLifecycleHooks{
7795
PostStarts: []testcontainers.ContainerHook{
7896
// Use a single hook to copy the starter script and wait for
7997
// the Kafka server to be ready. This prevents the wait running
8098
// if the starter script fails to copy.
8199
func(ctx context.Context, c testcontainers.Container) error {
82100
// 1. copy the starter script into the container
83-
if err := copyStarterScript(ctx, c); err != nil {
101+
if err := copyStarterScript(ctx, dockerProvider, c, publicPort); err != nil {
84102
return fmt.Errorf("copy starter script: %w", err)
85103
}
86104

@@ -122,25 +140,41 @@ func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustom
122140
}
123141

124142
// copyStarterScript copies the starter script into the container.
125-
func copyStarterScript(ctx context.Context, c testcontainers.Container) error {
143+
func copyStarterScript(ctx context.Context, dockerProvider *testcontainers.DockerProvider, c testcontainers.Container, publicPort nat.Port) error {
126144
if err := wait.ForMappedPort(publicPort).
127145
WaitUntilReady(ctx, c); err != nil {
128-
return fmt.Errorf("wait for mapped port: %w", err)
146+
return fmt.Errorf("wait for local port %s to be mapped: %w", publicPort, err)
129147
}
130148

131-
endpoint, err := c.PortEndpoint(ctx, publicPort, "PLAINTEXT")
149+
inspect, err := c.Inspect(ctx)
132150
if err != nil {
133-
return fmt.Errorf("port endpoint: %w", err)
151+
return fmt.Errorf("inspect: %w", err)
134152
}
135153

136-
inspect, err := c.Inspect(ctx)
154+
imageInspect, err := dockerProvider.Client().ImageInspect(ctx, inspect.Image)
137155
if err != nil {
138-
return fmt.Errorf("inspect: %w", err)
156+
return fmt.Errorf("image inspect: %w", err)
157+
}
158+
containerCmdParts := append(imageInspect.Config.Entrypoint, imageInspect.Config.Cmd...) //nolint:gocritic // New variable is needed.
159+
for i, s := range containerCmdParts {
160+
containerCmdParts[i] = strconv.Quote(s)
161+
}
162+
containerCmd := strings.Join(containerCmdParts, " ")
163+
164+
publicEndpoint, err := c.PortEndpoint(ctx, publicPort, "PLAINTEXT")
165+
if err != nil {
166+
return fmt.Errorf("port endpoint: %w", err)
139167
}
140168

141169
hostname := inspect.Config.Hostname
170+
brokerHostPort := net.JoinHostPort(hostname, strconv.Itoa(hostnameListenerLocalPort))
142171

143-
scriptContent := fmt.Sprintf(starterScriptContent, endpoint, hostname)
172+
scriptContent := fmt.Sprintf(starterScriptContent,
173+
containerCmd,
174+
publicEndpoint,
175+
brokerHostPort,
176+
localhostListenerLocalPort,
177+
)
144178

145179
if err := c.CopyToContainer(ctx, []byte(scriptContent), starterScript, 0o755); err != nil {
146180
return fmt.Errorf("copy to container: %w", err)
@@ -158,6 +192,11 @@ func WithClusterID(clusterID string) testcontainers.CustomizeRequestOption {
158192
// Brokers retrieves the broker connection strings from Kafka with only one entry,
159193
// defined by the exposed public port.
160194
func (kc *KafkaContainer) Brokers(ctx context.Context) ([]string, error) {
195+
publicPort, err := nat.NewPort("tcp", strconv.Itoa(publicListenerLocalPort))
196+
if err != nil {
197+
return nil, fmt.Errorf("nat.NewPort: %w", err)
198+
}
199+
161200
endpoint, err := kc.PortEndpoint(ctx, publicPort, "")
162201
if err != nil {
163202
return nil, err
@@ -184,14 +223,39 @@ func configureControllerQuorumVoters() testcontainers.CustomizeRequestOption {
184223
}
185224
}
186225

187-
req.Env["KAFKA_CONTROLLER_QUORUM_VOTERS"] = "1@" + host + ":9094"
226+
req.Env["KAFKA_CONTROLLER_QUORUM_VOTERS"] = fmt.Sprintf("1@%s:%d", host, controllerListenerLocalPort)
188227
}
189228

190229
return nil
191230
}
192231
// }
193232
}
194233

234+
func getDockerProvider(opts ...testcontainers.ContainerCustomizer) (*testcontainers.DockerProvider, error) {
235+
// Use a dummy request to get the provider from options.
236+
var req testcontainers.GenericContainerRequest
237+
for _, opt := range opts {
238+
if err := opt.Customize(&req); err != nil {
239+
return nil, err
240+
}
241+
}
242+
243+
logging := req.Logger
244+
if logging == nil {
245+
logging = log.Default()
246+
}
247+
genericProvider, err := req.ProviderType.GetProvider(testcontainers.WithLogger(logging))
248+
if err != nil {
249+
return nil, fmt.Errorf("get provider: %w", err)
250+
}
251+
252+
if dockerProvider, ok := genericProvider.(*testcontainers.DockerProvider); ok {
253+
return dockerProvider, nil
254+
}
255+
256+
return nil, fmt.Errorf("unknown provider type: %T", genericProvider)
257+
}
258+
195259
// validateKRaftVersion validates if the image version is compatible with KRaft mode,
196260
// which is available since version 7.0.0.
197261
func validateKRaftVersion(fqName string) error {

0 commit comments

Comments
 (0)