Skip to content

test: converting test suite to TestContainers#798

Open
j7nw4r wants to merge 57 commits into
fede1024:masterfrom
j7nw4r:j7nw4r/testing-update
Open

test: converting test suite to TestContainers#798
j7nw4r wants to merge 57 commits into
fede1024:masterfrom
j7nw4r:j7nw4r/testing-update

Conversation

@j7nw4r
Copy link
Copy Markdown
Contributor

@j7nw4r j7nw4r commented Oct 2, 2025

Migrate the integration tests from the Bitnami Kafka docker-compose to testcontainers-rs, and substantially extend the suite's binding coverage.

Testcontainers migration

Tests now spin up the broker through testcontainers-modules' apache Kafka image, with a single KafkaContext shared across tests in a binary via a tokio::sync::OnceCell (see tests/utils/containers.rs). cargo test works with no out-of-band setup beyond a reachable Docker daemon. The CI test matrix fans out across KAFKA_VERSION = 3.7 / 3.8 / 3.9 / 4.0 and resolves each row to a specific apache/kafka:<tag> image.

The migration also reorganised the integration tests into per-area files under tests/, with shared helpers in tests/utils/. A few assertions had to flex for the new environment:

  • tests/metadata.rs uses BROKER_ID = 1 (the image hardcodes it) and drops the port == 9092 check (testcontainers maps to a random host port).
  • tests/base_producer.rs::test_base_producer_timeout points the producer at 127.0.0.1:1 so the 100ms message.timeout.ms is exercised reliably; Kafka 4.0 with auto.create.topics.enable=true was fast enough on CI to deliver the message before the timeout fired.
  • tests/consumer_groups.rs::test_delete_unknown_group accepts either GroupIdNotFound or NotCoordinator, since coordinator state on a cold broker is non-deterministic when this test runs first in the binary.

The single-broker testcontainers image also needs KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1 and KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1; without those, the transactions tests hang trying to bring __transaction_state up to its default RF of 3.

testcontainers-modules is pinned at 0.12.1. Bumping to 0.13 pulls in testcontainers 0.25 -> tonic 0.14 -> rustc 1.88, which is past CI's rust_version: 1.85 floor. Happy to bump it in a follow-up if you'd rather.

New integration-test coverage

The testcontainers swap above is a straight port of the pre-existing integration tests. On top of that, this PR adds new tests that extend coverage to binding surfaces the upstream suite didn't reach.

  • Producer (tests/base_producer.rs, tests/future_producer.rs): compression round-trip per codec (gzip / snappy / lz4 / zstd; zstd is feature-gated and CI runs cargo test --features zstd so the codec actually links), idempotent producer with mid-stream flush(), MessageSizeTooLarge on an oversized BaseProducer::send, default partitioner key-to-partition determinism, BorrowedMessage::headers round-trip on the consumer side, Producer::flush drains in-flight.
  • Consumer (tests/base_consumer.rs, tests/stream_consumers.rs): pre/post rebalance callbacks fire with the expected partition sets, offsets_for_times returns the first offset at-or-after, regex subscription assigns only matching topics, cooperative-sticky rebalance_protocol() reports Cooperative after a group join.
  • Admin (tests/admin.rs, tests/consumer_groups.rs): topic-scoped alter_configs readback with ConfigSource::DynamicTopic, create_topics Fixed(3) returns InvalidReplicationFactor on a single broker, delete_groups returns NonEmptyGroup while the consumer is alive and succeeds after the consumer drops.
  • Transactions (tests/transactions.rs): send_offsets_to_transaction end-to-end on a consume-transform-produce loop, isolation.level filters interleaved aborted/committed payloads, producer fencing by epoch (second init_transactions with the same transactional.id fences the first's commit_transaction).
  • Observability (tests/base_producer.rs): ClientContext::stats fires on the statistics.interval.ms cadence and deserialises into the Statistics struct.

For reviewers

There's a walkthrough of the integration suite at tests/README.md on this branch. It covers the shared-broker pattern, the helper cheatsheet under tests/utils/, how to add a new test, the CI matrix, and a "known quirks" section for the non-obvious assertion shapes (the test_base_producer_timeout unreachable-broker trick, the test_delete_unknown_group either-error tolerance, the 3.7 wakeup-callback baseline). It's the fastest way to context-switch into the new test suite.

@j7nw4r j7nw4r changed the title Basic containers.rs test module. Basic KafkaContex. Converting test suite to TestContainers. Oct 2, 2025
@j7nw4r j7nw4r force-pushed the j7nw4r/testing-update branch from 942aa9d to a238463 Compare October 2, 2025 19:31
@j7nw4r j7nw4r force-pushed the j7nw4r/testing-update branch from 3d96014 to b4a0ac5 Compare October 2, 2025 20:30
@j7nw4r j7nw4r force-pushed the j7nw4r/testing-update branch from faa81f0 to bef9136 Compare October 2, 2025 21:17
@j7nw4r j7nw4r force-pushed the j7nw4r/testing-update branch from 3c354d3 to 5684312 Compare October 17, 2025 22:04
Within a test binary, tests share one KafkaContext via OnceCell and
default to running in parallel, so the broker's consumer-group
coordinator state when test_delete_unknown_group runs depends on
whether another test in the binary has touched a group first. A cold
broker returns NotCoordinator; a warmed broker returns GroupIdNotFound.
Both are correct "the group does not exist" responses, so accept
either rather than relying on test ordering.
@j7nw4r j7nw4r marked this pull request as ready for review May 21, 2026 23:11
@j7nw4r j7nw4r marked this pull request as draft May 22, 2026 13:14
j7nw4r added 25 commits May 22, 2026 08:19
Each CI matrix row passed KAFKA_VERSION (3.7-4.0), but the testcontainers
init was using the crate's hard-coded default tag for every run, so the
matrix was redundant. Resolve the env var into a specific apache/kafka
tag at startup and use the JVM image variant (the kafka-native variant
doesn't publish 3.7.x tags).
Add a junior-engineer walkthrough of the testcontainers-based suite
under tests/README.md, covering: prerequisites, the shared KafkaContext
/ OnceCell pattern, a worked example of writing a new test, the helper
cheatsheet, known quirks (transactions RF override, the unreachable-
broker timeout, GroupIdNotFound vs NotCoordinator), the CI matrix, and
troubleshooting. Replace the stale "Tests" section in CONTRIBUTING.md
with a pointer to it; the old section still described
./test_suite.sh + KAFKA_HOST, which the testcontainers swap obsoleted.

Drafting the doc surfaced dead code in tests/utils/mod.rs that was
left over from the docker-compose era:

- get_bootstrap_server() read the now-unset KAFKA_HOST env var.
- populate_topic() had no live callers (all sites were commented out).
- create_topic(name, partitions) was shadowed by the new
  admin::create_topic helper.
- ProducerTestContext was only referenced from the dead populate_topic.
- consumer_config() had one live caller (test_invalid_max_poll_interval)
  and was the only thing still wired to get_bootstrap_server.

Remove the first four; give consumer_config an explicit
bootstrap_servers parameter and update the caller to pass
KafkaContext::shared().bootstrap_servers. Also drop two leftover
commented-out populate_topic lines in tests/stream_consumers.rs that
sat next to their live replacements.

No behavior change; cargo fmt / clippy / clippy --tests stay clean.
The integration tests own their broker through testcontainers now, so
test_suite.sh's docker-compose orchestration is no longer load-bearing:
the Bitnami broker it spun up was never reached by any test in the new
suite. Inline the only step that matters (cargo test, with KAFKA_VERSION
passed through) into the CI test job and delete the surrounding files:

- delete docker-compose.yaml (Bitnami Kafka container; unused).
- delete test_suite.sh (its only live work was `cargo test` plus the
  smol / async-std example smoke runs; see below).
- the explicit `git submodule update --init` it ran is redundant with
  rdkafka-sys's build.rs, which initialises the librdkafka submodule
  itself. The other CI jobs already rely on this.

One coverage item drops with the script: the smol and async-std runtime
examples used to be invoked at the end of test_suite.sh as e2e smoke
tests against the docker-compose broker. The `check` job still builds
them via `cargo build --all-targets`, so compile coverage is preserved;
runtime coverage is not. If we want it back it belongs in its own job
(a service-container broker plus two `cargo run --example ...` steps)
rather than entangled with the integration suite.

Also drop the matching "easy follow-up" note from tests/README.md.
Two dangling pieces of the pre-testcontainers test infrastructure that
nothing in the tree now references:

- Dockerfile: copied a docker/run_tests.sh that doesn't exist, set
  KAFKA_HOST=kafka:9092 (a dead env var after the testcontainers swap),
  and pinned Rust 1.74 (the MSRV is 1.85). No build, workflow, or
  README in the repo invokes it.

- rdkafka.suppressions: valgrind suppression file. Its only caller was
  the run_with_valgrind helper inside the just-deleted test_suite.sh,
  and that helper was itself only reachable from an already-commented-
  out block.
kcov wrapper that hasn't kept up with the codebase. It globs
target/debug/test_* for integration test binaries, which matched the
pre-PR-798 layout (test_admin.rs, test_consumer_*) but matches nothing
now that integration tests live in per-area files (admin.rs,
base_producer.rs, etc.).

Nothing in CI or the repo's docs invokes it; the breakage went
unnoticed, which is a fair signal that no one is actually running it.
Modern alternative is `cargo llvm-cov` if someone wants coverage
locally; easier to reach for than maintaining a bespoke kcov script.
Bring back the e2e smoke test for the alternative runtime examples
that test_suite.sh used to run. Putting it in its own job, with a
service-container broker, instead of re-entangling it with the
integration suite:

- the integration test job uses testcontainers and a random host
  port, so it can't host examples that hardcode localhost:9092;
- the examples are runtime-correctness checks for smol and async-std,
  not broker-compatibility checks, so a single pinned Kafka version is
  enough and matrixing across 3.7-4.0 would be wasted CI time.

The service container is apache/kafka:4.0.2 in KRaft mode with the
same single-broker overrides we set in tests/utils/containers.rs
(transaction-state-log RF/ISR = 1), exposed on localhost:9092 with a
kafka-topics.sh-based healthcheck so the runner waits for the broker
before launching cargo. The examples produce / consume one message
each and exit non-zero on failure.

Verified locally that both examples compile with
`--no-default-features --features cmake-build`. The check job's
cargo build --all-targets continues to catch pure compile breakage;
this job catches anything that only manifests at runtime.
test_produce_consume_message_queue_nonempty_callback asserts wakeup
counts on a split_partition_queue, with the partition assigned at
Offset::Beginning. On apache/kafka 3.7.x (JVM image), librdkafka's
initial log-start-offset query posts an event to that queue during
setup, which invokes the nonempty callback once before any messages
have been produced. 3.8 / 3.9 / 4.0 don't show this. The old Bitnami
3.7 image apparently didn't either, which is why the test passed on
the pre-matrix-fix CI matrix that was always pulling the 4.x default.

Capture the wakeup count after initial setup as a baseline and assert
deltas (baseline + N) instead of absolute counts. The test still
verifies the callback's actual contract -- one wakeup per empty-to-
non-empty transition -- without depending on broker-version-specific
startup events.
Two doc-sync items missed in the preceding commits:

- The CI section listed four jobs; add the new runtime-examples job
  introduced for the smol / async-std example smoke tests.
- The Known quirks section gains an entry for the apache/kafka 3.7
  startup wakeup on the split partition queue, mirroring the test
  comment in test_produce_consume_message_queue_nonempty_callback.
Adds four FutureProducer round-trip tests in tests/future_producer.rs,
one per librdkafka-supported compression codec. Each test produces 64
messages with a non-trivial payload through a producer configured with
`compression.type=<codec>`, then consumes them back through a stream
consumer and asserts every key/value pair survives the codec.

The zstd test is gated on `#[cfg(feature = "zstd")]` because
rdkafka-sys's build.rs configures librdkafka with `--disable-zstd`
unless the `zstd` Cargo feature is enabled. The CI integration-test
job is updated to pass `--features zstd` so the matrix actually
exercises that codec.

Adds a `create_producer_with_overrides` helper to
tests/utils/producer/future_producer.rs and documents it (plus the
new `--features zstd` test invocation) in tests/README.md.
Adds `test_future_producer_idempotence` in tests/future_producer.rs.
It opens a FutureProducer with `enable.idempotence=true`, sends 1000
distinct payloads in two batches separated by a `flush()`, then
consumes them back through a stream consumer and asserts the result
has exactly 1000 distinct payloads on offsets 0..999. That is the
exactly-once contract librdkafka promises for the idempotent producer,
and the binding has to wire the implicit `acks=all` / retries /
in-flight cap / sequence-number tracking correctly for the test to
pass.

A true "forced reconnect midway" requires either a TCP proxy or a
privileged in-process disconnect; both are out of scope for the
testcontainer setup. The flush boundary is a stand-in that exercises
the producer's recovery from a fully-drained pipeline.
Adds `test_consumer_rebalance_callbacks` in tests/base_consumer.rs.
The test wires a `RecordingRebalanceContext` that captures every
pre_rebalance and post_rebalance event with the affected partitions,
creates a topic with two partitions, drives consumer 1 until both
partitions are assigned, then attaches consumer 2 to the same group
and drives both consumers until the group converges to one partition
each.

Assertions then verify that consumer 1 saw a post-rebalance Assign
event with both partitions followed by a Revoke (when consumer 2
joined), and that consumer 2 saw a post-rebalance Assign event with
exactly one partition. A binding regression that drops one of those
callbacks would cause this test to time out or fail the kind/count
check.
…e loop

`test_transaction_commit` already calls `send_offsets_to_transaction`,
but it produces a fixed `A` payload regardless of what was consumed and
only inspects the consumer's committed offset at the end. Adds
`test_transaction_send_offsets_consume_transform_produce` in
tests/transactions.rs that does a real consume-transform-produce loop
in two transactions: each transaction reads a batch, transforms each
payload, produces the transformed payload to the output topic, ties
the consumer offset advance to the transaction via
`send_offsets_to_transaction`, and commits.

Assertions then check that the consumer's committed offset equals the
total number of consumed messages (it advanced exactly once per batch,
in lockstep with the transaction) and that the output topic, read with
`isolation.level=read_committed`, contains the transformed payloads in
order. A binding regression on `send_offsets_to_transaction` would
either skip an offset advance or write the wrong payload through the
transactional producer.
Adds `test_base_producer_message_too_large` in tests/base_producer.rs.
The test configures a BaseProducer with `message.max.bytes=1024`, calls
`send` with a 4096-byte payload, and asserts that the synchronous
return is
`Err((KafkaError::MessageProduction(RDKafkaErrorCode::MessageSizeTooLarge), _))`.
It then sends a baseline small payload and flushes, and asserts the
collecting context observed exactly that one delivery, so a binding
regression that swallowed the oversized error or silently queued the
oversized message would surface as either the wrong error variant or
an unexpected second delivery.
Adds `test_future_producer_default_partitioner_is_deterministic` in
tests/future_producer.rs. With a six-partition topic and a four-key
working set, the test produces 16 copies of each key (no explicit
partition) and verifies every copy of a given key landed on the same
partition. It also asserts the four keys span at least two distinct
partitions, so a regression that collapsed everything onto partition 0
would also fail. A binding break that dropped or rewrote the key on
the way into librdkafka would scatter the same key across partitions
and surface here.
Adds `test_consumer_reads_message_headers` in tests/stream_consumers.rs.
The existing `test_base_producer_headers` only validates the produce
side via the delivery callback; this test produces a message with a
mixed header set (a byte-valued header, a str-valued header, an empty
header, and an explicitly null header), consumes it through a
StreamConsumer, and verifies that `BorrowedMessage::headers` exposes
each header in order with the matching key, value type, and value
bytes. A binding regression in `BorrowedMessage::headers` or the
underlying headers FFI conversion would surface here.
Adds `test_base_producer_flush_drains_inflight` in tests/base_producer.rs.
The test queues 200 records on a BaseProducer with `linger.ms=100`
so that they stay buffered, asserts that `in_flight_count()` is
non-zero immediately after the send loop, then calls `flush(20s)` and
asserts both that `in_flight_count()` drops to zero and that the
collecting context received exactly 200 successful delivery callbacks.
A binding regression that returned from `flush` before the queue had
drained, or misreported the in-flight count, would surface either as a
non-zero counter or a short delivery callback list.
Adds `test_consumer_offsets_for_times_first_at_or_after` in
tests/stream_consumers.rs. The existing
`test_produce_consume_with_timestamp` exercises `offsets_for_timestamp`
but only with two distinct timestamp values; this test produces 20
messages with a strictly monotonic per-message timestamp and then
queries at three points: an exact-match timestamp, a midpoint between
two adjacent message timestamps, and a timestamp past the high
watermark. The midpoint case asserts the returned offset is the first
message at-or-after the query (offset 8 for a query between offsets 7
and 8); the past-watermark case asserts `Offset::End`. A binding
regression that returned the nearest-neighbour offset or that mangled
the timestamp/offset TPL serialization would fail one of those
assertions.
Adds `test_consumer_regex_subscription_matches_only_prefixed` in
tests/stream_consumers.rs. The test creates two topics that match a
unique `^<rand>.*` pattern and one topic that does not, produces a
message into each, subscribes a StreamConsumer to the regex, and
drives the stream until the consumer's assignment stabilises. It
then asserts both that the assignment is exactly the two matching
topics and that the non-matching topic was never delivered to the
stream. A binding regression that dropped the leading `^` or that
crossed up topic ownership during the regex resolve would surface
here.
…-sticky

Adds `test_consumer_cooperative_sticky_rebalance_protocol` in
tests/stream_consumers.rs. The existing
`test_produce_consume_base_incremental_assign_and_unassign` exercises
the incremental_assign/incremental_unassign API on a manually-assigned
consumer (no group join, so rebalance_protocol stays None). This test
joins a group with `partition.assignment.strategy=cooperative-sticky`,
drives the consumer until both partitions are assigned, and asserts
that `rebalance_protocol()` returns `None` before the join and
`Cooperative` after. A binding regression in the
`rebalance_protocol` accessor (or that dropped the cooperative-sticky
configuration) would surface here.
Adds `test_alter_topic_configs_retention_ms_dynamic_topic` in
tests/admin.rs. `test_configs` already covers broker-scoped
alter_configs; this test covers the topic-scoped path by creating a
fresh topic, altering `retention.ms` with
`AlterConfig::new(ResourceSpecifier::Topic(...))`, then issuing
`describe_configs` and asserting the entry value updated to the new
number and that its `source` switched to `ConfigSource::DynamicTopic`.
A binding regression that misrouted a topic-scoped AlterConfigs
request to a broker handler, or that misclassified the readback
source, would fail one of those assertions.
Adds `test_create_topics_fixed_replication_too_high` in tests/admin.rs.
`test_incorrect_replication_factors_are_ignored_when_creating_topics`
already exercises the client-side validation path for a
`TopicReplication::Variable` mismatch. This test covers the broker-side
path: it asks for `TopicReplication::Fixed(3)` against the single-broker
container, expects the broker to reject the request, and pins the
surfaced error to `RDKafkaErrorCode::InvalidReplicationFactor`. A
binding regression that swallowed the per-topic error or remapped the
code would fail one of the assertions.
…drop

Adds `test_delete_non_empty_consumer_group` in tests/consumer_groups.rs.
The test creates a topic and a `BaseConsumer` in a random group,
subscribes the consumer and polls it until it has actually joined the
group (assignment is non-empty), calls `delete_groups`, and asserts the
per-group result is `Err((<group>, NonEmptyGroup))`. It then drops the
consumer (LeaveGroup) and retries `delete_groups` until the broker
reports success or the 30s deadline elapses. A binding regression that
misclassified the per-group error, or that failed to retry once the
member had left, would surface here.
… payloads

Adds `test_transaction_isolation_level_filters_aborted` in
tests/transactions.rs. `test_transaction_abort` and
`test_transaction_commit` each cover a single transaction in
isolation, but they never interleave committed and aborted records
on the same topic-partition. This test runs two transactional
producers (different `transactional.id`) into the same topic: one
commits 7 payloads, the other aborts 5. A `read_committed` consumer
must see exactly the 7 committed payloads in order, and a
`read_uncommitted` consumer must see all 12 payloads (committed
followed by aborted). A binding regression on the `isolation.level`
plumbing, or a librdkafka filter break that included or dropped
aborted records, would surface as the wrong payload set.
Adds `test_transaction_producer_fenced_by_epoch` in
tests/transactions.rs. The test creates two transactional producers
that share the same `transactional.id`, drives the first through
`init_transactions`/`begin_transaction`/`send`, then runs
`init_transactions` on the second producer to bump the epoch on the
broker. The first producer's subsequent `commit_transaction` is then
expected to fail with a `KafkaError::Transaction` whose code is one of
the librdkafka fencing codes (`Fenced`, `InvalidProducerEpoch`, or
`ProducerFenced`). A binding regression that pretended the fenced
commit succeeded, or that mapped the fencing code into a different
error variant, would surface here.
…ence

Adds `test_base_producer_statistics_callback_invoked` in
tests/base_producer.rs. The test wires the existing `CollectingContext`,
configures `statistics.interval.ms=100`, runs a send loop for ~400ms,
and then asserts at least two stats callbacks landed and that each
delivered `Statistics` carries a non-empty `name` and a `client_type`
of `"producer"`. A binding regression that swallowed the stats
callback, or that deserialised the JSON into a partial/default struct,
would fail one of those assertions.
@j7nw4r j7nw4r marked this pull request as ready for review May 22, 2026 17:47
@j7nw4r
Copy link
Copy Markdown
Contributor Author

j7nw4r commented May 22, 2026

@fede1024 Sorry had serious health issues so this went by the wayside. I was able to finish this. If it's still wanted please take a look. I added a tests/README.md that may help in understanding.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant