Skip to content

Commit b785f4c

Browse files
committed
fix(test): resolve flaky Kafka Connect integration test
- Fix cleanup order: terminate containers before removing network - Replace log-based wait with HTTP health check for reliability - Pin connectors image to digest for reproducibility - Add log consumer for debugging container failures - Upgrade Redpanda version to v25.2.1 - Remove WithAttachable flag for consistency
1 parent 9321b19 commit b785f4c

3 files changed

Lines changed: 32 additions & 19 deletions

File tree

backend/pkg/api/handle_kafka_connect_integration_test.go

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/docker/go-connections/nat"
2424
"github.com/stretchr/testify/assert"
2525
"github.com/stretchr/testify/require"
26+
"github.com/testcontainers/testcontainers-go"
2627
"github.com/testcontainers/testcontainers-go/modules/redpanda"
2728
"github.com/testcontainers/testcontainers-go/network"
2829

@@ -42,14 +43,11 @@ func (s *APIIntegrationTestSuite) TestHandleCreateConnector() {
4243
ctx := t.Context()
4344

4445
// create one common network that all containers will share
45-
testNetwork, err := network.New(ctx, network.WithAttachable())
46+
testNetwork, err := network.New(ctx)
4647
require.NoError(err)
47-
t.Cleanup(func() {
48-
assert.NoError(testNetwork.Remove(context.Background()))
49-
})
5048

5149
redpandaContainer, err := redpanda.Run(ctx,
52-
"redpandadata/redpanda:v23.3.18",
50+
"redpandadata/redpanda:v25.2.1",
5351
network.WithNetwork([]string{"redpanda"}, testNetwork),
5452
redpanda.WithListener("redpanda:29092"),
5553
)
@@ -58,15 +56,29 @@ func (s *APIIntegrationTestSuite) TestHandleCreateConnector() {
5856
seedBroker, err := redpandaContainer.KafkaSeedBroker(ctx)
5957
require.NoError(err)
6058

59+
// Log consumer for debugging container failures
60+
logConsumer := &testutil.TestContainersLogger{
61+
LogPrefix: "[kafka-connect] ",
62+
}
63+
6164
// Kafka KafkaConnect container
62-
connectC, err := testutil.RunRedpandaConnectorsContainer(
65+
connectContainer, err := testutil.RunRedpandaConnectorsContainer(
6366
ctx,
6467
[]string{"redpanda:29092"},
6568
network.WithNetwork([]string{"kafka-connect"}, testNetwork),
69+
testcontainers.WithLogConsumers(logConsumer),
6670
)
6771
require.NoError(err)
6872

69-
connectContainer := connectC
73+
// Register cleanup in correct order: containers first, then network
74+
t.Cleanup(func() {
75+
cleanupCtx := context.Background()
76+
// Terminate containers first (LIFO: registered last, runs first)
77+
_ = connectContainer.Terminate(cleanupCtx)
78+
_ = redpandaContainer.Terminate(cleanupCtx)
79+
// Then remove network
80+
_ = testNetwork.Remove(cleanupCtx)
81+
})
7082

7183
connectPort, err := connectContainer.MappedPort(ctx, nat.Port("8083"))
7284
require.NoError(err)
@@ -104,11 +116,6 @@ func (s *APIIntegrationTestSuite) TestHandleCreateConnector() {
104116
s.api.ConnectSvc = oldConnectSvc
105117
}()
106118

107-
t.Cleanup(func() {
108-
assert.NoError(connectContainer.Terminate(context.Background()))
109-
assert.NoError(redpandaContainer.Terminate(context.Background()))
110-
})
111-
112119
t.Run("happy path", func(t *testing.T) {
113120
ctx, cancel := context.WithTimeout(t.Context(), 30*time.Second)
114121
defer cancel()

backend/pkg/testutil/kafkaconnect.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ func RunRedpandaConnectorsContainer(ctx context.Context, bootstrapServers []stri
3838

3939
request := testcontainers.GenericContainerRequest{
4040
ContainerRequest: testcontainers.ContainerRequest{
41-
Image: "docker.cloudsmith.io/redpanda/connectors-unsupported/connectors:latest",
41+
// Pin to digest for reproducibility - :latest tag can have variable behavior
42+
Image: "docker.cloudsmith.io/redpanda/connectors-unsupported/connectors@sha256:0ff21e793ef3042f2f48fb3d6549fae0ef687950a76b8017ab9d8b17c33cadb0",
4243
ExposedPorts: []string{"8083/tcp"},
4344
Env: map[string]string{
4445
"CONNECT_CONFIGURATION": testConnectConfig,
@@ -47,11 +48,15 @@ func RunRedpandaConnectorsContainer(ctx context.Context, bootstrapServers []stri
4748
"CONNECT_HEAP_OPTS": "-Xms512M -Xmx512M",
4849
"CONNECT_LOG_LEVEL": "info",
4950
},
50-
WaitingFor: wait.ForAll(
51-
wait.ForLog("Kafka Connect started").
52-
WithPollInterval(500 * time.Millisecond).
53-
WithStartupTimeout(waitTimeout),
54-
),
51+
// Use HTTP health check instead of log-based wait for better reliability
52+
// and early failure detection if the container exits
53+
WaitingFor: wait.ForHTTP("/connectors").
54+
WithPort("8083/tcp").
55+
WithStatusCodeMatcher(func(status int) bool {
56+
return status == 200
57+
}).
58+
WithPollInterval(1 * time.Second).
59+
WithStartupTimeout(waitTimeout),
5560
},
5661
Started: true,
5762
}

frontend/tests/shared/global-setup.mjs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,8 @@ topic.creation.enable=false
240240

241241
let connect;
242242
try {
243-
connect = await new GenericContainer('docker.cloudsmith.io/redpanda/connectors-unsupported/connectors:latest')
243+
// Pin to digest for reproducibility - :latest tag can have variable behavior
244+
connect = await new GenericContainer('docker.cloudsmith.io/redpanda/connectors-unsupported/connectors@sha256:0ff21e793ef3042f2f48fb3d6549fae0ef687950a76b8017ab9d8b17c33cadb0')
244245
.withPlatform('linux/amd64')
245246
.withNetwork(network)
246247
.withNetworkAliases('connect')

0 commit comments

Comments
 (0)