SchemaId header store DSL test for TimestampedKeyValueStoreWithHeaders#4214
SchemaId header store DSL test for TimestampedKeyValueStoreWithHeaders#4214Lucy Liu (lucliu1108) wants to merge 39 commits into
Conversation
|
🎉 All Contributor License Agreements have been signed. Ready to merge. |
There was a problem hiding this comment.
Pull request overview
Adds new DSL-focused integration tests to validate header-based SchemaId propagation with TimestampedKeyValueStoreWithHeaders, while removing the older stateless header schema ID integration test and adjusting test dependencies.
Changes:
- Added
TimestampedKeyValueStoreWithHeadersDslIntegrationTestcoveringgroupBy/countandKTable.filterwith headers-aware stores + IQv1 querying. - Removed
KafkaStreamsHeaderSchemaIdIntegrationTest(stateless filter test). - Updated
streams-integration-testsMaven deps to Kafka4.3.0-SNAPSHOTartifacts (clients/streams/test-utils).
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 9 comments.
| File | Description |
|---|---|
| streams-integration-tests/src/test/java/io/confluent/kafka/streams/integration/dsl/TimestampedKeyValueStoreWithHeadersDslIntegrationTest.java | New integration test verifying SchemaId headers are preserved in headers-aware state stores + IQv1 queries |
| streams-integration-tests/src/test/java/io/confluent/kafka/streams/integration/KafkaStreamsHeaderSchemaIdIntegrationTest.java | Removed older stateless header-schema-id integration test |
| streams-integration-tests/pom.xml | Pins Kafka test dependencies to 4.3.0-SNAPSHOT and adds test-utils |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| assertKeySchemaIdHeader(streamsResult.headers(), "IQv1 get streams"); | ||
|
|
||
| ValueTimestampHeaders<Long> helloResult = store.get(createKey("hello")); | ||
| assertNotNull(streamsResult, "IQv1: hello should exist in store"); |
There was a problem hiding this comment.
assertNotNull is checking streamsResult instead of helloResult, which can mask failures (or fail for the wrong reason). Update the assertion to validate helloResult is non-null.
| assertNotNull(streamsResult, "IQv1: hello should exist in store"); | |
| assertNotNull(helloResult, "IQv1: hello should exist in store"); |
| List<ConsumerRecord<GenericRecord, GenericRecord>> results = | ||
| consumeRecords(outputTopic, "dsl-filter-consumer", 2); | ||
|
|
||
| assertEquals(2, results.size(), "Only the long lines should pass filter"); | ||
| assertEquals("long", results.get(0).key().get("word").toString()); | ||
| assertEquals("this is a long long line", results.get(0).value().get("line").toString()); | ||
| assertSchemaIdHeaders(results.get(0).headers(), "filter output"); | ||
| assertEquals("long2", results.get(1).key().get("word").toString()); | ||
| assertEquals("this is another long line", results.get(1).value().get("line").toString()); | ||
| assertSchemaIdHeaders(results.get(1).headers(), "filter output2"); |
There was a problem hiding this comment.
This topology uses KTable#filter, which preserves table semantics and can emit tombstones (null values) for records that do not satisfy the predicate. With the current assertions, a tombstone for "short" may be consumed and either (a) make results.size() exceed 2 or (b) cause results.get(i).value() to be null, making the test flaky. Consider either filtering out tombstones when consuming/asserting (e.g., only keep records with non-null values), or update the expected output to include and validate the tombstone, or switch to a KStream filter if the intent is “drop” semantics.
| private void createTopics(String... topicNames) throws Exception { | ||
| Properties adminProps = new Properties(); | ||
| adminProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); | ||
| try (AdminClient admin = AdminClient.create(adminProps)) { |
There was a problem hiding this comment.
AdminClient configuration should use AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG rather than ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG. While the key string is the same today, using the Admin constant avoids confusing config intent and matches client-specific conventions.
|
|
||
| package io.confluent.kafka.streams.integration.dsl; | ||
|
|
||
| import static org.apache.kafka.streams.KeyValue.pair; |
There was a problem hiding this comment.
These imports appear unused within this new test file. Removing them reduces noise and avoids checkstyle/spotbugs issues in builds that enforce unused-import rules.
| import static org.apache.kafka.streams.KeyValue.pair; |
| import org.apache.kafka.common.header.Header; | ||
| import org.apache.kafka.common.header.Headers; | ||
| import org.apache.kafka.common.serialization.Serdes; | ||
| import org.apache.kafka.common.utils.Bytes; |
There was a problem hiding this comment.
These imports appear unused within this new test file. Removing them reduces noise and avoids checkstyle/spotbugs issues in builds that enforce unused-import rules.
| import org.apache.kafka.common.utils.Bytes; |
| import org.apache.kafka.streams.kstream.Materialized; | ||
| import org.apache.kafka.streams.kstream.Produced; | ||
| import org.apache.kafka.streams.processor.StateStore; | ||
| import org.apache.kafka.streams.state.KeyValueStore; |
There was a problem hiding this comment.
These imports appear unused within this new test file. Removing them reduces noise and avoids checkstyle/spotbugs issues in builds that enforce unused-import rules.
| import org.apache.kafka.streams.state.KeyValueStore; |
| }, Grouped.with(keySerde, Serdes.String())) | ||
| .count(Materialized.<GenericRecord, Long>as( | ||
| Stores.persistentTimestampedKeyValueStoreWithHeaders(storeName)) | ||
| .withKeySerde(keySerde)) |
There was a problem hiding this comment.
The materialized store config sets the key serde but not the value serde. For clarity and to avoid relying on internal/default serde inference for count(), set the value serde explicitly (e.g., withValueSerde(Serdes.Long())).
| .withKeySerde(keySerde)) | |
| .withKeySerde(keySerde) | |
| .withValueSerde(Serdes.Long())) |
| <dependency> | ||
| <groupId>org.apache.kafka</groupId> | ||
| <artifactId>kafka-clients</artifactId> | ||
| <version>4.3.0-SNAPSHOT</version> | ||
| <scope>test</scope> | ||
| <exclusions> | ||
| <exclusion> | ||
| <groupId>io.confluent.networking</groupId> | ||
| <artifactId>cc-custom-dns-resolver-java18</artifactId> | ||
| </exclusion> | ||
| </exclusions> | ||
| </dependency> | ||
| <dependency> | ||
| <groupId>org.apache.kafka</groupId> | ||
| <artifactId>kafka-clients</artifactId> | ||
| <version>4.3.0-SNAPSHOT</version> | ||
| <classifier>test</classifier> | ||
| <scope>test</scope> | ||
| <exclusions> |
There was a problem hiding this comment.
Pinning multiple Kafka artifacts to a *-SNAPSHOT version in-module can make CI/dev builds non-reproducible and fail depending on repository availability, and it increases the risk of dependency convergence/binary compatibility issues with the rest of the repo. Prefer aligning versions via the parent <dependencyManagement> (or a single ${kafka.version} property) and consider gating SNAPSHOT usage behind a Maven profile if it’s only required for specific CI runs. Also, having two kafka-clients deps (one normal + one test classifier) at different “coordinates” is valid, but it’s worth double-checking it’s required; if the goal is just exclusions, applying them only where needed can reduce classpath complexity.
| } | ||
|
|
||
| /** | ||
| * Verifies `groupBy()` and `count()` works correctly use headers-aware stores. |
There was a problem hiding this comment.
Grammar fix: change “works correctly use” to “work correctly using” (also “groupBy() and count()” is plural).
| * Verifies `groupBy()` and `count()` works correctly use headers-aware stores. | |
| * Verifies `groupBy()` and `count()` work correctly using headers-aware stores. |
| </dependency> | ||
| <dependency> | ||
| <groupId>org.apache.kafka</groupId> | ||
| <artifactId>kafka-clients</artifactId> |
There was a problem hiding this comment.
Why do we have <artifactId>kafka-clients</artifactId> two times?
Alieh Saeedi (aliehsaeedii)
left a comment
There was a problem hiding this comment.
Thanks Lucy Liu (@lucliu1108). I left a comment. Also copilot caught some thing. Pleae check all of them. They seem valid to me.
| assertKeySchemaIdHeader(streamsResult.headers(), "IQv1 get streams"); | ||
|
|
||
| ValueTimestampHeaders<Long> helloResult = store.get(createKey("hello")); | ||
| assertNotNull(streamsResult, "IQv1: hello should exist in store"); |
|
|
||
| package io.confluent.kafka.streams.integration.dsl; | ||
|
|
||
| import static org.apache.kafka.streams.KeyValue.pair; |
| import org.apache.kafka.common.header.Header; | ||
| import org.apache.kafka.common.header.Headers; | ||
| import org.apache.kafka.common.serialization.Serdes; | ||
| import org.apache.kafka.common.utils.Bytes; |
| import org.apache.kafka.streams.kstream.Materialized; | ||
| import org.apache.kafka.streams.kstream.Produced; | ||
| import org.apache.kafka.streams.processor.StateStore; | ||
| import org.apache.kafka.streams.state.KeyValueStore; |
| } | ||
|
|
||
| /** | ||
| * Verifies `groupBy()` and `count()` works correctly use headers-aware stores. |
| wordKey.put("word", word); | ||
| return wordKey; | ||
| }, Grouped.with(keySerde, Serdes.String())) | ||
| .count(Materialized.<GenericRecord, Long>as( |
There was a problem hiding this comment.
Since we want to test each operator separately, we should check the contents of the corresponding state store for each one. The safest approach is to verify both that the state store contains the expected schema IDs and that each operator’s output can be deserialized correctly. We should do this for both key and value, and also consider corner cases where the value is null or the key is null.
Alieh Saeedi (aliehsaeedii)
left a comment
There was a problem hiding this comment.
Thanks Lucy Liu (@lucliu1108) for the tests. I amde a pass.
Beside that, The body lists 8 tests, but the file has 12: Please update the description.
| assertKeySchemaIdHeader(helloResult.headers(), "IQv1 get hello"); | ||
|
|
||
| // Send a null value to a KStream with flatMapValues causes | ||
| // a NullPointerException, which crashes the streams instance into ERROR state. |
There was a problem hiding this comment.
The only corner case this test exercises is "send null to flatMapValues → topology crashes to ERROR". That's a valid thing to pin down, but it kills the streams instance before any other corner case can run, so the count store's tombstone / decrement-style behavior is never exercised here — unlike the sibling aggregate and reduce tests, which both verify the "operator returns null → tombstone" path on their stores.
Two options:
- Reorder: run a real-world corner case first (e.g. an existing key receiving a tombstone update, or a key whose count drops back to 0), then send the poison-pill null last.
- Split into two tests: one for the happy path + tombstone, one for the NPE-crash, each with its own streams instance.
There was a problem hiding this comment.
I split into 2 tests:
shouldCrashOnNullValueInFlatMapValues— keeps the original pipeline (flatMapValues → groupBy → count) and exercises the NPE-crash corner case.shouldGroupCountWithHeaders— dropsflatMapValues(groupByKey → count) and asserts that a null input value is skipped by count rather than tombstoning the existing entry.
| assertEquals(KafkaStreams.State.ERROR, streams.state(), | ||
| "Streams should be in ERROR state after null value hits flatMapValues"); | ||
|
|
||
| // Changelog verification (must run before streams crashes — use records already written) |
There was a problem hiding this comment.
With cachingEnabled = true and commit.interval.ms = 100 (set in createStreamsProps), there's a real window where the cache hasn't been flushed yet when the poison-pill null arrives, and the changelog topic could legitimately have fewer than 8 records. The >= 8 assertion can make the test flaky. on a loaded CI runner. Maybe force a flush before sending the null .
There was a problem hiding this comment.
i modified the structure to do the flush and changelog verification before the poison pill record is sent -- since the poison pill shouldn't be written into the changelog anyway.
| */ | ||
| @ParameterizedTest | ||
| @ValueSource(booleans = {true, false}) | ||
| public void shouldMapValuesWithHeaders(boolean cachingEnabled) throws Exception { |
There was a problem hiding this comment.
This test only covers the case where mapValues is unmaterialized — the result is read by the downstream to(...) via ValueGetter against the source table's store. So the IQv1 verification and the changelog verification both target sourceStoreName, never a mapValues output store. That leaves a real gap: when a user writes table.mapValues(fn, Materialized.as(Stores.persistentTimestampedKeyValueStoreWithHeaders(...))) the transformed value is what gets written to the store and the changelog, and the headers attached to that materialization are a separate code path. Nothing in the suite proves headers survive that.
Could you add a second case (or a sibling test) that materializes the mapValues result with persistentTimestampedKeyValueStoreWithHeaders, then verifies: - The mapValues output store via IQv1 — keys present, values are the transformed form, key-schema-id header attached. The mapValues output store's changelog — same checks plus tombstone propagation.
The Same gap exists in shouldTransformValuesWithHeaders.
There was a problem hiding this comment.
Added an extra param to test materialized/unmaterialized version of the mapValues and transformValues test.
Alieh Saeedi (aliehsaeedii)
left a comment
There was a problem hiding this comment.
Missing tests:
- globalTable() — uses a KV store, completely uncovered.
- KStream.toTable(Materialized) as a primary subject (only used incidentally inside the disabled suppress test).
- KStream.process() / processValues() with an attached KV state store (PAPI-on-DSL).
- KStream.cogroup() / KTable.cogroup() → aggregate.
- Stateless KStream.filter — the deleted KafkaStreamsHeaderSchemaIdIntegrationTest covered this; the new file's filter coverage is KTable-only.
| createKey("hello"), (GenericRecord) null)).get(); | ||
| producer.flush(); | ||
| } | ||
| Thread.sleep(2000); |
There was a problem hiding this comment.
Thread.sleep(...) as a sync barrier in many places. Replace with TestUtils.waitForCondition(...). Sleeps are flaky on loaded CI.
| List<ConsumerRecord<GenericRecord, GenericRecord>> results = | ||
| consumeRecords(outputTopic, "dsl-aggregate-consumer" + suffix, maxExpected); | ||
|
|
||
| assertTrue(results.size() >= minExpected && results.size() <= maxExpected, |
There was a problem hiding this comment.
assertTrue(results.size() >= N, ...) patterns at many places mask under-counts — the test then Map.get(key) and silently returns null on a missing record. After deterministic waits, switch to assertEquals(N, ...).
| props.put(StreamsConfig.APPLICATION_ID_CONFIG, appId); | ||
| props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); | ||
| props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, restApp.restConnect); | ||
| // props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, StreamsConfig.DSL_STORE_FORMAT_HEADERS); |
There was a problem hiding this comment.
Delete or document why it's not used.
|
|
||
| private Properties createStreamsProps(String appId, boolean cachingEnabled) { | ||
| Properties props = new Properties(); | ||
| props.put(StreamsConfig.APPLICATION_ID_CONFIG, appId); |
There was a problem hiding this comment.
Fixed applicationId per test. streams.cleanUp() helps, but adding UUID.randomUUID() would protect against rerun collisions on local state-store dirs.
There was a problem hiding this comment.
added a randomUUID for state-store dir creation.
| } | ||
| } | ||
|
|
||
| private List<ConsumerRecord<GenericRecord, Long>> consumeLongValueRecords( |
There was a problem hiding this comment.
Near-duplicate of other 2 consume record helpers. Consolidate into one generic helper parameterized on the value deserializer.
There was a problem hiding this comment.
Remove the other 2, only keep a generic helper
| */ | ||
| @ParameterizedTest | ||
| @ValueSource(booleans = {true, false}) | ||
| public void shouldTransformValuesWithHeaders(boolean cachingEnabled) throws Exception { |
There was a problem hiding this comment.
only the unmaterialized form is tested (reads source store via ValueGetter). The materialized-output variant — mapValues(fn, Materialized.as(...withHeaders...)) / transformValues(... Materialized.as(...withHeaders...)) — is never exercised, so headers on the transformed store + its changelog aren't verified.
There was a problem hiding this comment.
Added its materialized version + changelog verification.
| */ | ||
| @ParameterizedTest | ||
| @ValueSource(booleans = {true, false}) | ||
| public void shouldFilterAndFilterNotWithHeaders(boolean cachingEnabled) throws Exception { |
There was a problem hiding this comment.
only filterStoreName-changelog is consumed; the source-table changelog (where the upstream tombstone first lands) is not.
There was a problem hiding this comment.
Added verification for source table changelog.
| */ | ||
| @ParameterizedTest | ||
| @ValueSource(booleans = {true, false}) | ||
| public void shouldMergeStreamsWithHeaders(boolean cachingEnabled) throws Exception { |
There was a problem hiding this comment.
only storeName1-changelog; storeName2-changelog missing. Tombstone is also sent on inputTopic1 only needs a symmetric one on inputTopic2
There was a problem hiding this comment.
Added a tombstone on inputTopic2, and verification on storeName2-changelog.
| */ | ||
| @ParameterizedTest | ||
| @ValueSource(booleans = {true, false}) | ||
| public void shouldPrefixScanWithHeaders(boolean cachingEnabled) throws Exception { |
There was a problem hiding this comment.
happy path only. No tombstone path, no empty-prefix scan, no missing-prefix scan, no multi-result scan (e.g. prefix "k" should return both ka and kb), no boundary keys.
There was a problem hiding this comment.
multi-result doesn't work as expected, since Avro uses variable-length encoding, its key first encodes length of the string, then the string itself(code). So prefixScan in this case could only do exact match to the string.
| */ | ||
| @ParameterizedTest | ||
| @ValueSource(booleans = {true, false}) | ||
| public void shouldForeignKeyJoinWithHeaders(boolean cachingEnabled) throws Exception { |
There was a problem hiding this comment.
happy path only. No FK tombstone, no FK re-routing, no null-joiner-result. Changelog block only checks the join-result store, not the names/ages source-store changelogs and not the internal subscription changelogs.
Alieh Saeedi (aliehsaeedii)
left a comment
There was a problem hiding this comment.
Missing tests:
- globalTable() — uses a KV store, completely uncovered.
- KStream.toTable(Materialized) as a primary subject (only used incidentally inside the disabled suppress test).
- KStream.process() / processValues() with an attached KV state store (PAPI-on-DSL).
- KStream.cogroup() / KTable.cogroup() → aggregate.
- Stateless KStream.filter — the deleted KafkaStreamsHeaderSchemaIdIntegrationTest covered this; the new file's filter coverage is KTable-only.
# Please enter a commit message to explain why this merge is necessary, # especially if it merges an updated upstream into a topic branch. # # Lines starting with '#' will be ignored, and an empty message aborts # the commit.
|
Alieh Saeedi (@aliehsaeedii) Thanks for the review
|
Summary
Add DSL-level integration tests for TimestampedKeyValueStoreWithHeaders to verify that schema ID headers are correctly propagated through windowed Kafka Streams operations backed by header-aware state stores.
shouldGroupCountWithHeaders
Ops: stream → flatMapValues → groupBy → count → to
shouldGroupAndAggregateWithHeaders
Ops: stream → groupByKey → aggregate → to
shouldGroupAndReduceWithHeaders
Ops: stream → groupByKey → reduce → to
shouldMapValuesWithHeaders
Ops: table (materialized) → mapValues (unmaterialized, uses ValueGetter) → to
shouldFilterAndFilterNotWithHeaders
Ops: table (materialized) → filterNot (unmaterialized) → filter (materialized) → to
shouldJoinTablesWithHeaders
Ops: Two tables → join (inner), leftJoin, outerJoin → to (3 output topics)
shouldMergeStreamsWithHeaders
Ops: Two tables (materialized) → toStream → merge → to
shouldTransformValuesWithHeaders
Ops: table (materialized) → transformValues (unmaterialized, uses ValueGetter) → to
Checklist
Please answer the questions with Y, N or N/A if not applicable.
References
JIRA:
Test & Review
Open questions / Follow-ups