test: converting test suite to TestContainers#798
Open
j7nw4r wants to merge 57 commits into
Open
Conversation
942aa9d to
a238463
Compare
3d96014 to
b4a0ac5
Compare
faa81f0 to
bef9136
Compare
3c354d3 to
5684312
Compare
Within a test binary, tests share one KafkaContext via OnceCell and default to running in parallel, so the broker's consumer-group coordinator state when test_delete_unknown_group runs depends on whether another test in the binary has touched a group first. A cold broker returns NotCoordinator; a warmed broker returns GroupIdNotFound. Both are correct "the group does not exist" responses, so accept either rather than relying on test ordering.
Each CI matrix row passed KAFKA_VERSION (3.7-4.0), but the testcontainers init was using the crate's hard-coded default tag for every run, so the matrix was redundant. Resolve the env var into a specific apache/kafka tag at startup and use the JVM image variant (the kafka-native variant doesn't publish 3.7.x tags).
Add a junior-engineer walkthrough of the testcontainers-based suite under tests/README.md, covering: prerequisites, the shared KafkaContext / OnceCell pattern, a worked example of writing a new test, the helper cheatsheet, known quirks (transactions RF override, the unreachable- broker timeout, GroupIdNotFound vs NotCoordinator), the CI matrix, and troubleshooting. Replace the stale "Tests" section in CONTRIBUTING.md with a pointer to it; the old section still described ./test_suite.sh + KAFKA_HOST, which the testcontainers swap obsoleted. Drafting the doc surfaced dead code in tests/utils/mod.rs that was left over from the docker-compose era: - get_bootstrap_server() read the now-unset KAFKA_HOST env var. - populate_topic() had no live callers (all sites were commented out). - create_topic(name, partitions) was shadowed by the new admin::create_topic helper. - ProducerTestContext was only referenced from the dead populate_topic. - consumer_config() had one live caller (test_invalid_max_poll_interval) and was the only thing still wired to get_bootstrap_server. Remove the first four; give consumer_config an explicit bootstrap_servers parameter and update the caller to pass KafkaContext::shared().bootstrap_servers. Also drop two leftover commented-out populate_topic lines in tests/stream_consumers.rs that sat next to their live replacements. No behavior change; cargo fmt / clippy / clippy --tests stay clean.
The integration tests own their broker through testcontainers now, so test_suite.sh's docker-compose orchestration is no longer load-bearing: the Bitnami broker it spun up was never reached by any test in the new suite. Inline the only step that matters (cargo test, with KAFKA_VERSION passed through) into the CI test job and delete the surrounding files: - delete docker-compose.yaml (Bitnami Kafka container; unused). - delete test_suite.sh (its only live work was `cargo test` plus the smol / async-std example smoke runs; see below). - the explicit `git submodule update --init` it ran is redundant with rdkafka-sys's build.rs, which initialises the librdkafka submodule itself. The other CI jobs already rely on this. One coverage item drops with the script: the smol and async-std runtime examples used to be invoked at the end of test_suite.sh as e2e smoke tests against the docker-compose broker. The `check` job still builds them via `cargo build --all-targets`, so compile coverage is preserved; runtime coverage is not. If we want it back it belongs in its own job (a service-container broker plus two `cargo run --example ...` steps) rather than entangled with the integration suite. Also drop the matching "easy follow-up" note from tests/README.md.
Two dangling pieces of the pre-testcontainers test infrastructure that nothing in the tree now references: - Dockerfile: copied a docker/run_tests.sh that doesn't exist, set KAFKA_HOST=kafka:9092 (a dead env var after the testcontainers swap), and pinned Rust 1.74 (the MSRV is 1.85). No build, workflow, or README in the repo invokes it. - rdkafka.suppressions: valgrind suppression file. Its only caller was the run_with_valgrind helper inside the just-deleted test_suite.sh, and that helper was itself only reachable from an already-commented- out block.
kcov wrapper that hasn't kept up with the codebase. It globs target/debug/test_* for integration test binaries, which matched the pre-PR-798 layout (test_admin.rs, test_consumer_*) but matches nothing now that integration tests live in per-area files (admin.rs, base_producer.rs, etc.). Nothing in CI or the repo's docs invokes it; the breakage went unnoticed, which is a fair signal that no one is actually running it. Modern alternative is `cargo llvm-cov` if someone wants coverage locally; easier to reach for than maintaining a bespoke kcov script.
Bring back the e2e smoke test for the alternative runtime examples that test_suite.sh used to run. Putting it in its own job, with a service-container broker, instead of re-entangling it with the integration suite: - the integration test job uses testcontainers and a random host port, so it can't host examples that hardcode localhost:9092; - the examples are runtime-correctness checks for smol and async-std, not broker-compatibility checks, so a single pinned Kafka version is enough and matrixing across 3.7-4.0 would be wasted CI time. The service container is apache/kafka:4.0.2 in KRaft mode with the same single-broker overrides we set in tests/utils/containers.rs (transaction-state-log RF/ISR = 1), exposed on localhost:9092 with a kafka-topics.sh-based healthcheck so the runner waits for the broker before launching cargo. The examples produce / consume one message each and exit non-zero on failure. Verified locally that both examples compile with `--no-default-features --features cmake-build`. The check job's cargo build --all-targets continues to catch pure compile breakage; this job catches anything that only manifests at runtime.
test_produce_consume_message_queue_nonempty_callback asserts wakeup counts on a split_partition_queue, with the partition assigned at Offset::Beginning. On apache/kafka 3.7.x (JVM image), librdkafka's initial log-start-offset query posts an event to that queue during setup, which invokes the nonempty callback once before any messages have been produced. 3.8 / 3.9 / 4.0 don't show this. The old Bitnami 3.7 image apparently didn't either, which is why the test passed on the pre-matrix-fix CI matrix that was always pulling the 4.x default. Capture the wakeup count after initial setup as a baseline and assert deltas (baseline + N) instead of absolute counts. The test still verifies the callback's actual contract -- one wakeup per empty-to- non-empty transition -- without depending on broker-version-specific startup events.
Two doc-sync items missed in the preceding commits: - The CI section listed four jobs; add the new runtime-examples job introduced for the smol / async-std example smoke tests. - The Known quirks section gains an entry for the apache/kafka 3.7 startup wakeup on the split partition queue, mirroring the test comment in test_produce_consume_message_queue_nonempty_callback.
Adds four FutureProducer round-trip tests in tests/future_producer.rs, one per librdkafka-supported compression codec. Each test produces 64 messages with a non-trivial payload through a producer configured with `compression.type=<codec>`, then consumes them back through a stream consumer and asserts every key/value pair survives the codec. The zstd test is gated on `#[cfg(feature = "zstd")]` because rdkafka-sys's build.rs configures librdkafka with `--disable-zstd` unless the `zstd` Cargo feature is enabled. The CI integration-test job is updated to pass `--features zstd` so the matrix actually exercises that codec. Adds a `create_producer_with_overrides` helper to tests/utils/producer/future_producer.rs and documents it (plus the new `--features zstd` test invocation) in tests/README.md.
Adds `test_future_producer_idempotence` in tests/future_producer.rs. It opens a FutureProducer with `enable.idempotence=true`, sends 1000 distinct payloads in two batches separated by a `flush()`, then consumes them back through a stream consumer and asserts the result has exactly 1000 distinct payloads on offsets 0..999. That is the exactly-once contract librdkafka promises for the idempotent producer, and the binding has to wire the implicit `acks=all` / retries / in-flight cap / sequence-number tracking correctly for the test to pass. A true "forced reconnect midway" requires either a TCP proxy or a privileged in-process disconnect; both are out of scope for the testcontainer setup. The flush boundary is a stand-in that exercises the producer's recovery from a fully-drained pipeline.
Adds `test_consumer_rebalance_callbacks` in tests/base_consumer.rs. The test wires a `RecordingRebalanceContext` that captures every pre_rebalance and post_rebalance event with the affected partitions, creates a topic with two partitions, drives consumer 1 until both partitions are assigned, then attaches consumer 2 to the same group and drives both consumers until the group converges to one partition each. Assertions then verify that consumer 1 saw a post-rebalance Assign event with both partitions followed by a Revoke (when consumer 2 joined), and that consumer 2 saw a post-rebalance Assign event with exactly one partition. A binding regression that drops one of those callbacks would cause this test to time out or fail the kind/count check.
…e loop `test_transaction_commit` already calls `send_offsets_to_transaction`, but it produces a fixed `A` payload regardless of what was consumed and only inspects the consumer's committed offset at the end. Adds `test_transaction_send_offsets_consume_transform_produce` in tests/transactions.rs that does a real consume-transform-produce loop in two transactions: each transaction reads a batch, transforms each payload, produces the transformed payload to the output topic, ties the consumer offset advance to the transaction via `send_offsets_to_transaction`, and commits. Assertions then check that the consumer's committed offset equals the total number of consumed messages (it advanced exactly once per batch, in lockstep with the transaction) and that the output topic, read with `isolation.level=read_committed`, contains the transformed payloads in order. A binding regression on `send_offsets_to_transaction` would either skip an offset advance or write the wrong payload through the transactional producer.
Adds `test_base_producer_message_too_large` in tests/base_producer.rs. The test configures a BaseProducer with `message.max.bytes=1024`, calls `send` with a 4096-byte payload, and asserts that the synchronous return is `Err((KafkaError::MessageProduction(RDKafkaErrorCode::MessageSizeTooLarge), _))`. It then sends a baseline small payload and flushes, and asserts the collecting context observed exactly that one delivery, so a binding regression that swallowed the oversized error or silently queued the oversized message would surface as either the wrong error variant or an unexpected second delivery.
Adds `test_future_producer_default_partitioner_is_deterministic` in tests/future_producer.rs. With a six-partition topic and a four-key working set, the test produces 16 copies of each key (no explicit partition) and verifies every copy of a given key landed on the same partition. It also asserts the four keys span at least two distinct partitions, so a regression that collapsed everything onto partition 0 would also fail. A binding break that dropped or rewrote the key on the way into librdkafka would scatter the same key across partitions and surface here.
Adds `test_consumer_reads_message_headers` in tests/stream_consumers.rs. The existing `test_base_producer_headers` only validates the produce side via the delivery callback; this test produces a message with a mixed header set (a byte-valued header, a str-valued header, an empty header, and an explicitly null header), consumes it through a StreamConsumer, and verifies that `BorrowedMessage::headers` exposes each header in order with the matching key, value type, and value bytes. A binding regression in `BorrowedMessage::headers` or the underlying headers FFI conversion would surface here.
Adds `test_base_producer_flush_drains_inflight` in tests/base_producer.rs. The test queues 200 records on a BaseProducer with `linger.ms=100` so that they stay buffered, asserts that `in_flight_count()` is non-zero immediately after the send loop, then calls `flush(20s)` and asserts both that `in_flight_count()` drops to zero and that the collecting context received exactly 200 successful delivery callbacks. A binding regression that returned from `flush` before the queue had drained, or misreported the in-flight count, would surface either as a non-zero counter or a short delivery callback list.
Adds `test_consumer_offsets_for_times_first_at_or_after` in tests/stream_consumers.rs. The existing `test_produce_consume_with_timestamp` exercises `offsets_for_timestamp` but only with two distinct timestamp values; this test produces 20 messages with a strictly monotonic per-message timestamp and then queries at three points: an exact-match timestamp, a midpoint between two adjacent message timestamps, and a timestamp past the high watermark. The midpoint case asserts the returned offset is the first message at-or-after the query (offset 8 for a query between offsets 7 and 8); the past-watermark case asserts `Offset::End`. A binding regression that returned the nearest-neighbour offset or that mangled the timestamp/offset TPL serialization would fail one of those assertions.
Adds `test_consumer_regex_subscription_matches_only_prefixed` in tests/stream_consumers.rs. The test creates two topics that match a unique `^<rand>.*` pattern and one topic that does not, produces a message into each, subscribes a StreamConsumer to the regex, and drives the stream until the consumer's assignment stabilises. It then asserts both that the assignment is exactly the two matching topics and that the non-matching topic was never delivered to the stream. A binding regression that dropped the leading `^` or that crossed up topic ownership during the regex resolve would surface here.
…-sticky Adds `test_consumer_cooperative_sticky_rebalance_protocol` in tests/stream_consumers.rs. The existing `test_produce_consume_base_incremental_assign_and_unassign` exercises the incremental_assign/incremental_unassign API on a manually-assigned consumer (no group join, so rebalance_protocol stays None). This test joins a group with `partition.assignment.strategy=cooperative-sticky`, drives the consumer until both partitions are assigned, and asserts that `rebalance_protocol()` returns `None` before the join and `Cooperative` after. A binding regression in the `rebalance_protocol` accessor (or that dropped the cooperative-sticky configuration) would surface here.
Adds `test_alter_topic_configs_retention_ms_dynamic_topic` in tests/admin.rs. `test_configs` already covers broker-scoped alter_configs; this test covers the topic-scoped path by creating a fresh topic, altering `retention.ms` with `AlterConfig::new(ResourceSpecifier::Topic(...))`, then issuing `describe_configs` and asserting the entry value updated to the new number and that its `source` switched to `ConfigSource::DynamicTopic`. A binding regression that misrouted a topic-scoped AlterConfigs request to a broker handler, or that misclassified the readback source, would fail one of those assertions.
Adds `test_create_topics_fixed_replication_too_high` in tests/admin.rs. `test_incorrect_replication_factors_are_ignored_when_creating_topics` already exercises the client-side validation path for a `TopicReplication::Variable` mismatch. This test covers the broker-side path: it asks for `TopicReplication::Fixed(3)` against the single-broker container, expects the broker to reject the request, and pins the surfaced error to `RDKafkaErrorCode::InvalidReplicationFactor`. A binding regression that swallowed the per-topic error or remapped the code would fail one of the assertions.
…drop Adds `test_delete_non_empty_consumer_group` in tests/consumer_groups.rs. The test creates a topic and a `BaseConsumer` in a random group, subscribes the consumer and polls it until it has actually joined the group (assignment is non-empty), calls `delete_groups`, and asserts the per-group result is `Err((<group>, NonEmptyGroup))`. It then drops the consumer (LeaveGroup) and retries `delete_groups` until the broker reports success or the 30s deadline elapses. A binding regression that misclassified the per-group error, or that failed to retry once the member had left, would surface here.
… payloads Adds `test_transaction_isolation_level_filters_aborted` in tests/transactions.rs. `test_transaction_abort` and `test_transaction_commit` each cover a single transaction in isolation, but they never interleave committed and aborted records on the same topic-partition. This test runs two transactional producers (different `transactional.id`) into the same topic: one commits 7 payloads, the other aborts 5. A `read_committed` consumer must see exactly the 7 committed payloads in order, and a `read_uncommitted` consumer must see all 12 payloads (committed followed by aborted). A binding regression on the `isolation.level` plumbing, or a librdkafka filter break that included or dropped aborted records, would surface as the wrong payload set.
Adds `test_transaction_producer_fenced_by_epoch` in tests/transactions.rs. The test creates two transactional producers that share the same `transactional.id`, drives the first through `init_transactions`/`begin_transaction`/`send`, then runs `init_transactions` on the second producer to bump the epoch on the broker. The first producer's subsequent `commit_transaction` is then expected to fail with a `KafkaError::Transaction` whose code is one of the librdkafka fencing codes (`Fenced`, `InvalidProducerEpoch`, or `ProducerFenced`). A binding regression that pretended the fenced commit succeeded, or that mapped the fencing code into a different error variant, would surface here.
…ence Adds `test_base_producer_statistics_callback_invoked` in tests/base_producer.rs. The test wires the existing `CollectingContext`, configures `statistics.interval.ms=100`, runs a send loop for ~400ms, and then asserts at least two stats callbacks landed and that each delivered `Statistics` carries a non-empty `name` and a `client_type` of `"producer"`. A binding regression that swallowed the stats callback, or that deserialised the JSON into a partial/default struct, would fail one of those assertions.
Contributor
Author
|
@fede1024 Sorry had serious health issues so this went by the wayside. I was able to finish this. If it's still wanted please take a look. I added a tests/README.md that may help in understanding. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Migrate the integration tests from the Bitnami Kafka docker-compose to
testcontainers-rs, and substantially extend the suite's binding coverage.Testcontainers migration
Tests now spin up the broker through
testcontainers-modules' apache Kafka image, with a singleKafkaContextshared across tests in a binary via atokio::sync::OnceCell(seetests/utils/containers.rs).cargo testworks with no out-of-band setup beyond a reachable Docker daemon. The CI test matrix fans out acrossKAFKA_VERSION = 3.7 / 3.8 / 3.9 / 4.0and resolves each row to a specificapache/kafka:<tag>image.The migration also reorganised the integration tests into per-area files under
tests/, with shared helpers intests/utils/. A few assertions had to flex for the new environment:tests/metadata.rsusesBROKER_ID = 1(the image hardcodes it) and drops theport == 9092check (testcontainers maps to a random host port).tests/base_producer.rs::test_base_producer_timeoutpoints the producer at127.0.0.1:1so the 100msmessage.timeout.msis exercised reliably; Kafka 4.0 withauto.create.topics.enable=truewas fast enough on CI to deliver the message before the timeout fired.tests/consumer_groups.rs::test_delete_unknown_groupaccepts eitherGroupIdNotFoundorNotCoordinator, since coordinator state on a cold broker is non-deterministic when this test runs first in the binary.The single-broker testcontainers image also needs
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1andKAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1; without those, the transactions tests hang trying to bring__transaction_stateup to its default RF of 3.testcontainers-modulesis pinned at0.12.1. Bumping to0.13pulls intestcontainers 0.25->tonic 0.14-> rustc 1.88, which is past CI'srust_version: 1.85floor. Happy to bump it in a follow-up if you'd rather.New integration-test coverage
The testcontainers swap above is a straight port of the pre-existing integration tests. On top of that, this PR adds new tests that extend coverage to binding surfaces the upstream suite didn't reach.
tests/base_producer.rs,tests/future_producer.rs): compression round-trip per codec (gzip/snappy/lz4/zstd; zstd is feature-gated and CI runscargo test --features zstdso the codec actually links), idempotent producer with mid-streamflush(),MessageSizeTooLargeon an oversizedBaseProducer::send, default partitioner key-to-partition determinism,BorrowedMessage::headersround-trip on the consumer side,Producer::flushdrains in-flight.tests/base_consumer.rs,tests/stream_consumers.rs): pre/post rebalance callbacks fire with the expected partition sets,offsets_for_timesreturns the first offset at-or-after, regex subscription assigns only matching topics, cooperative-stickyrebalance_protocol()reportsCooperativeafter a group join.tests/admin.rs,tests/consumer_groups.rs): topic-scopedalter_configsreadback withConfigSource::DynamicTopic,create_topicsFixed(3)returnsInvalidReplicationFactoron a single broker,delete_groupsreturnsNonEmptyGroupwhile the consumer is alive and succeeds after the consumer drops.tests/transactions.rs):send_offsets_to_transactionend-to-end on a consume-transform-produce loop,isolation.levelfilters interleaved aborted/committed payloads, producer fencing by epoch (secondinit_transactionswith the sametransactional.idfences the first'scommit_transaction).tests/base_producer.rs):ClientContext::statsfires on thestatistics.interval.mscadence and deserialises into theStatisticsstruct.For reviewers
There's a walkthrough of the integration suite at
tests/README.mdon this branch. It covers the shared-broker pattern, the helper cheatsheet undertests/utils/, how to add a new test, the CI matrix, and a "known quirks" section for the non-obvious assertion shapes (thetest_base_producer_timeoutunreachable-broker trick, thetest_delete_unknown_groupeither-error tolerance, the 3.7 wakeup-callback baseline). It's the fastest way to context-switch into the new test suite.