Skip to content

SchemaId header store DSL test for TimestampedKeyValueStoreWithHeaders#4214

Open
Lucy Liu (lucliu1108) wants to merge 39 commits into
confluentinc:masterfrom
lucliu1108:dsl-test
Open

SchemaId header store DSL test for TimestampedKeyValueStoreWithHeaders#4214
Lucy Liu (lucliu1108) wants to merge 39 commits into
confluentinc:masterfrom
lucliu1108:dsl-test

Conversation

@lucliu1108
Copy link
Copy Markdown
Member

@lucliu1108 Lucy Liu (lucliu1108) commented Mar 20, 2026

Summary

Add DSL-level integration tests for TimestampedKeyValueStoreWithHeaders to verify that schema ID headers are correctly propagated through windowed Kafka Streams operations backed by header-aware state stores.

  1. shouldGroupCountWithHeaders
    Ops: stream → flatMapValues → groupBy → count → to

    • Edge case: Null value to flatMapValues → crashes streams into ERROR state (NullPointerException)
  2. shouldGroupAndAggregateWithHeaders
    Ops: stream → groupByKey → aggregate → to

    • Edge case:: Null input value → aggregate skips it (count unchanged)
    • Edge case:: Aggregator returns null for "DELETE" → tombstones the key
  3. shouldGroupAndReduceWithHeaders
    Ops: stream → groupByKey → reduce → to

    • Edge case: Null input value → reduce skips it (value unchanged)
    • Edge case: Reducer returns null for "DELETE" → tombstones the key
  4. shouldMapValuesWithHeaders
    Ops: table (materialized) → mapValues (unmaterialized, uses ValueGetter) → to

    • Edge case: Tombstone (null value) → deletes key from source store
  5. shouldFilterAndFilterNotWithHeaders
    Ops: table (materialized) → filterNot (unmaterialized) → filter (materialized) → to

    • Edge case: Tombstone → deletes key from filter store; filtered-out keys absent from filter store
  6. shouldJoinTablesWithHeaders
    Ops: Two tables → join (inner), leftJoin, outerJoin → to (3 output topics)

    • Edge case: Tombstone on right table (delete alice's age) → inner join removes alice, left/outer join updates to "age unknown"
    • Edge case: Null join result — inner joiner returns null for age "0" → tombstones bob in inner join store, left/outer join keep bob with "age 0"
  7. shouldMergeStreamsWithHeaders
    Ops: Two tables (materialized) → toStream → merge → to

    • Edge case: Tombstone → deletes key from its source store; other store unaffected
  8. shouldTransformValuesWithHeaders
    Ops: table (materialized) → transformValues (unmaterialized, uses ValueGetter) → to

    • Edge case: Tombstone → deletes key from source store include implementation strategy.

Checklist

Please answer the questions with Y, N or N/A if not applicable.

  • [ ] Contains customer facing changes? Including API/behavior changes
  • [ ] Is this change gated behind config(s)?
    • List the config(s) needed to be set to enable this change
  • [ ] Did you add sufficient unit test and/or integration test coverage for this PR?
    • If not, please explain why it is not required
  • [ ] Does this change require modifying existing system tests or adding new system tests?
    • If so, include tracking information for the system test changes
  • [ ] Must this be released together with other change(s), either in this repo or another one?
    • If so, please include the link(s) to the changes that must be released together

References

JIRA:

Test & Review

Open questions / Follow-ups

@lucliu1108 Lucy Liu (lucliu1108) requested a review from a team as a code owner March 20, 2026 16:06
Copilot AI review requested due to automatic review settings March 20, 2026 16:06
@confluent-cla-assistant
Copy link
Copy Markdown

🎉 All Contributor License Agreements have been signed. Ready to merge.
Please push an empty commit if you would like to re-run the checks to verify CLA status for all contributors.

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds new DSL-focused integration tests to validate header-based SchemaId propagation with TimestampedKeyValueStoreWithHeaders, while removing the older stateless header schema ID integration test and adjusting test dependencies.

Changes:

  • Added TimestampedKeyValueStoreWithHeadersDslIntegrationTest covering groupBy/count and KTable.filter with headers-aware stores + IQv1 querying.
  • Removed KafkaStreamsHeaderSchemaIdIntegrationTest (stateless filter test).
  • Updated streams-integration-tests Maven deps to Kafka 4.3.0-SNAPSHOT artifacts (clients/streams/test-utils).

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 9 comments.

File Description
streams-integration-tests/src/test/java/io/confluent/kafka/streams/integration/dsl/TimestampedKeyValueStoreWithHeadersDslIntegrationTest.java New integration test verifying SchemaId headers are preserved in headers-aware state stores + IQv1 queries
streams-integration-tests/src/test/java/io/confluent/kafka/streams/integration/KafkaStreamsHeaderSchemaIdIntegrationTest.java Removed older stateless header-schema-id integration test
streams-integration-tests/pom.xml Pins Kafka test dependencies to 4.3.0-SNAPSHOT and adds test-utils

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

assertKeySchemaIdHeader(streamsResult.headers(), "IQv1 get streams");

ValueTimestampHeaders<Long> helloResult = store.get(createKey("hello"));
assertNotNull(streamsResult, "IQv1: hello should exist in store");
Copy link

Copilot AI Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assertNotNull is checking streamsResult instead of helloResult, which can mask failures (or fail for the wrong reason). Update the assertion to validate helloResult is non-null.

Suggested change
assertNotNull(streamsResult, "IQv1: hello should exist in store");
assertNotNull(helloResult, "IQv1: hello should exist in store");

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree

Comment on lines +237 to +246
List<ConsumerRecord<GenericRecord, GenericRecord>> results =
consumeRecords(outputTopic, "dsl-filter-consumer", 2);

assertEquals(2, results.size(), "Only the long lines should pass filter");
assertEquals("long", results.get(0).key().get("word").toString());
assertEquals("this is a long long line", results.get(0).value().get("line").toString());
assertSchemaIdHeaders(results.get(0).headers(), "filter output");
assertEquals("long2", results.get(1).key().get("word").toString());
assertEquals("this is another long line", results.get(1).value().get("line").toString());
assertSchemaIdHeaders(results.get(1).headers(), "filter output2");
Copy link

Copilot AI Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This topology uses KTable#filter, which preserves table semantics and can emit tombstones (null values) for records that do not satisfy the predicate. With the current assertions, a tombstone for "short" may be consumed and either (a) make results.size() exceed 2 or (b) cause results.get(i).value() to be null, making the test flaky. Consider either filtering out tombstones when consuming/asserting (e.g., only keep records with non-null values), or update the expected output to include and validate the tombstone, or switch to a KStream filter if the intent is “drop” semantics.

Copilot uses AI. Check for mistakes.
Comment on lines +270 to +273
private void createTopics(String... topicNames) throws Exception {
Properties adminProps = new Properties();
adminProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
try (AdminClient admin = AdminClient.create(adminProps)) {
Copy link

Copilot AI Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AdminClient configuration should use AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG rather than ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG. While the key string is the same today, using the Admin constant avoids confusing config intent and matches client-specific conventions.

Copilot uses AI. Check for mistakes.

package io.confluent.kafka.streams.integration.dsl;

import static org.apache.kafka.streams.KeyValue.pair;
Copy link

Copilot AI Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These imports appear unused within this new test file. Removing them reduces noise and avoids checkstyle/spotbugs issues in builds that enforce unused-import rules.

Suggested change
import static org.apache.kafka.streams.KeyValue.pair;

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree

import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
Copy link

Copilot AI Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These imports appear unused within this new test file. Removing them reduces noise and avoids checkstyle/spotbugs issues in builds that enforce unused-import rules.

Suggested change
import org.apache.kafka.common.utils.Bytes;

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree

import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.KeyValueStore;
Copy link

Copilot AI Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These imports appear unused within this new test file. Removing them reduces noise and avoids checkstyle/spotbugs issues in builds that enforce unused-import rules.

Suggested change
import org.apache.kafka.streams.state.KeyValueStore;

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree

}, Grouped.with(keySerde, Serdes.String()))
.count(Materialized.<GenericRecord, Long>as(
Stores.persistentTimestampedKeyValueStoreWithHeaders(storeName))
.withKeySerde(keySerde))
Copy link

Copilot AI Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The materialized store config sets the key serde but not the value serde. For clarity and to avoid relying on internal/default serde inference for count(), set the value serde explicitly (e.g., withValueSerde(Serdes.Long())).

Suggested change
.withKeySerde(keySerde))
.withKeySerde(keySerde)
.withValueSerde(Serdes.Long()))

Copilot uses AI. Check for mistakes.
Comment on lines 68 to 86
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>4.3.0-SNAPSHOT</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>io.confluent.networking</groupId>
<artifactId>cc-custom-dns-resolver-java18</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>4.3.0-SNAPSHOT</version>
<classifier>test</classifier>
<scope>test</scope>
<exclusions>
Copy link

Copilot AI Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pinning multiple Kafka artifacts to a *-SNAPSHOT version in-module can make CI/dev builds non-reproducible and fail depending on repository availability, and it increases the risk of dependency convergence/binary compatibility issues with the rest of the repo. Prefer aligning versions via the parent <dependencyManagement> (or a single ${kafka.version} property) and consider gating SNAPSHOT usage behind a Maven profile if it’s only required for specific CI runs. Also, having two kafka-clients deps (one normal + one test classifier) at different “coordinates” is valid, but it’s worth double-checking it’s required; if the goal is just exclusions, applying them only where needed can reduce classpath complexity.

Copilot uses AI. Check for mistakes.
}

/**
* Verifies `groupBy()` and `count()` works correctly use headers-aware stores.
Copy link

Copilot AI Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Grammar fix: change “works correctly use” to “work correctly using” (also “groupBy() and count()” is plural).

Suggested change
* Verifies `groupBy()` and `count()` works correctly use headers-aware stores.
* Verifies `groupBy()` and `count()` work correctly using headers-aware stores.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree

</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we have <artifactId>kafka-clients</artifactId> two times?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Lucy Liu (@lucliu1108). I left a comment. Also copilot caught some thing. Pleae check all of them. They seem valid to me.

assertKeySchemaIdHeader(streamsResult.headers(), "IQv1 get streams");

ValueTimestampHeaders<Long> helloResult = store.get(createKey("hello"));
assertNotNull(streamsResult, "IQv1: hello should exist in store");
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree


package io.confluent.kafka.streams.integration.dsl;

import static org.apache.kafka.streams.KeyValue.pair;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree

import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree

import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.KeyValueStore;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree

}

/**
* Verifies `groupBy()` and `count()` works correctly use headers-aware stores.
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree

wordKey.put("word", word);
return wordKey;
}, Grouped.with(keySerde, Serdes.String()))
.count(Materialized.<GenericRecord, Long>as(
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we want to test each operator separately, we should check the contents of the corresponding state store for each one. The safest approach is to verify both that the state store contains the expected schema IDs and that each operator’s output can be deserialized correctly. We should do this for both key and value, and also consider corner cases where the value is null or the key is null.

@lucliu1108 Lucy Liu (lucliu1108) changed the title [DRAFT] SchemaId header store DSL test SchemaId header store DSL test for TimestampedKeyValueStoreWithHeaders Apr 6, 2026
@lucliu1108 Lucy Liu (lucliu1108) changed the title SchemaId header store DSL test for TimestampedKeyValueStoreWithHeaders SchemaId header store DSL test for TimestampedKeyValueStoreWithHeaders and TimestampedWindowStoreWithHeader Apr 7, 2026
@lucliu1108 Lucy Liu (lucliu1108) changed the title SchemaId header store DSL test for TimestampedKeyValueStoreWithHeaders and TimestampedWindowStoreWithHeader SchemaId header store DSL test for TimestampedKeyValueStoreWithHeaders Apr 14, 2026
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Lucy Liu (@lucliu1108) for the tests. I amde a pass.
Beside that, The body lists 8 tests, but the file has 12: Please update the description.

assertKeySchemaIdHeader(helloResult.headers(), "IQv1 get hello");

// Send a null value to a KStream with flatMapValues causes
// a NullPointerException, which crashes the streams instance into ERROR state.
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only corner case this test exercises is "send null to flatMapValues → topology crashes to ERROR". That's a valid thing to pin down, but it kills the streams instance before any other corner case can run, so the count store's tombstone / decrement-style behavior is never exercised here — unlike the sibling aggregate and reduce tests, which both verify the "operator returns null → tombstone" path on their stores.
Two options:

  1. Reorder: run a real-world corner case first (e.g. an existing key receiving a tombstone update, or a key whose count drops back to 0), then send the poison-pill null last.
  2. Split into two tests: one for the happy path + tombstone, one for the NPE-crash, each with its own streams instance.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I split into 2 tests:

  1. shouldCrashOnNullValueInFlatMapValues — keeps the original pipeline (flatMapValues → groupBy → count) and exercises the NPE-crash corner case.
  2. shouldGroupCountWithHeaders— drops flatMapValues (groupByKey → count) and asserts that a null input value is skipped by count rather than tombstoning the existing entry.

assertEquals(KafkaStreams.State.ERROR, streams.state(),
"Streams should be in ERROR state after null value hits flatMapValues");

// Changelog verification (must run before streams crashes — use records already written)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With cachingEnabled = true and commit.interval.ms = 100 (set in createStreamsProps), there's a real window where the cache hasn't been flushed yet when the poison-pill null arrives, and the changelog topic could legitimately have fewer than 8 records. The >= 8 assertion can make the test flaky. on a loaded CI runner. Maybe force a flush before sending the null .

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i modified the structure to do the flush and changelog verification before the poison pill record is sent -- since the poison pill shouldn't be written into the changelog anyway.

*/
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void shouldMapValuesWithHeaders(boolean cachingEnabled) throws Exception {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test only covers the case where mapValues is unmaterialized — the result is read by the downstream to(...) via ValueGetter against the source table's store. So the IQv1 verification and the changelog verification both target sourceStoreName, never a mapValues output store. That leaves a real gap: when a user writes table.mapValues(fn, Materialized.as(Stores.persistentTimestampedKeyValueStoreWithHeaders(...))) the transformed value is what gets written to the store and the changelog, and the headers attached to that materialization are a separate code path. Nothing in the suite proves headers survive that.

Could you add a second case (or a sibling test) that materializes the mapValues result with persistentTimestampedKeyValueStoreWithHeaders, then verifies: - The mapValues output store via IQv1 — keys present, values are the transformed form, key-schema-id header attached. The mapValues output store's changelog — same checks plus tombstone propagation.

The Same gap exists in shouldTransformValuesWithHeaders.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added an extra param to test materialized/unmaterialized version of the mapValues and transformValues test.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing tests:

  • globalTable() — uses a KV store, completely uncovered.
  • KStream.toTable(Materialized) as a primary subject (only used incidentally inside the disabled suppress test).
  • KStream.process() / processValues() with an attached KV state store (PAPI-on-DSL).
  • KStream.cogroup() / KTable.cogroup() → aggregate.
  • Stateless KStream.filter — the deleted KafkaStreamsHeaderSchemaIdIntegrationTest covered this; the new file's filter coverage is KTable-only.

createKey("hello"), (GenericRecord) null)).get();
producer.flush();
}
Thread.sleep(2000);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thread.sleep(...) as a sync barrier in many places. Replace with TestUtils.waitForCondition(...). Sleeps are flaky on loaded CI.

List<ConsumerRecord<GenericRecord, GenericRecord>> results =
consumeRecords(outputTopic, "dsl-aggregate-consumer" + suffix, maxExpected);

assertTrue(results.size() >= minExpected && results.size() <= maxExpected,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assertTrue(results.size() >= N, ...) patterns at many places mask under-counts — the test then Map.get(key) and silently returns null on a missing record. After deterministic waits, switch to assertEquals(N, ...).

props.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, restApp.restConnect);
// props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, StreamsConfig.DSL_STORE_FORMAT_HEADERS);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

??

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete or document why it's not used.


private Properties createStreamsProps(String appId, boolean cachingEnabled) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed applicationId per test. streams.cleanUp() helps, but adding UUID.randomUUID() would protect against rerun collisions on local state-store dirs.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a randomUUID for state-store dir creation.

}
}

private List<ConsumerRecord<GenericRecord, Long>> consumeLongValueRecords(
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Near-duplicate of other 2 consume record helpers. Consolidate into one generic helper parameterized on the value deserializer.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the other 2, only keep a generic helper

*/
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void shouldTransformValuesWithHeaders(boolean cachingEnabled) throws Exception {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only the unmaterialized form is tested (reads source store via ValueGetter). The materialized-output variant — mapValues(fn, Materialized.as(...withHeaders...)) / transformValues(... Materialized.as(...withHeaders...)) — is never exercised, so headers on the transformed store + its changelog aren't verified.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added its materialized version + changelog verification.

*/
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void shouldFilterAndFilterNotWithHeaders(boolean cachingEnabled) throws Exception {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only filterStoreName-changelog is consumed; the source-table changelog (where the upstream tombstone first lands) is not.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added verification for source table changelog.

*/
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void shouldMergeStreamsWithHeaders(boolean cachingEnabled) throws Exception {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only storeName1-changelog; storeName2-changelog missing. Tombstone is also sent on inputTopic1 only needs a symmetric one on inputTopic2

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a tombstone on inputTopic2, and verification on storeName2-changelog.

*/
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void shouldPrefixScanWithHeaders(boolean cachingEnabled) throws Exception {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

happy path only. No tombstone path, no empty-prefix scan, no missing-prefix scan, no multi-result scan (e.g. prefix "k" should return both ka and kb), no boundary keys.

Copy link
Copy Markdown
Member Author

@lucliu1108 Lucy Liu (lucliu1108) Apr 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

multi-result doesn't work as expected, since Avro uses variable-length encoding, its key first encodes length of the string, then the string itself(code). So prefixScan in this case could only do exact match to the string.

*/
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void shouldForeignKeyJoinWithHeaders(boolean cachingEnabled) throws Exception {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

happy path only. No FK tombstone, no FK re-routing, no null-joiner-result. Changelog block only checks the join-result store, not the names/ages source-store changelogs and not the internal subscription changelogs.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing tests:

  • globalTable() — uses a KV store, completely uncovered.
  • KStream.toTable(Materialized) as a primary subject (only used incidentally inside the disabled suppress test).
  • KStream.process() / processValues() with an attached KV state store (PAPI-on-DSL).
  • KStream.cogroup() / KTable.cogroup() → aggregate.
  • Stateless KStream.filter — the deleted KafkaStreamsHeaderSchemaIdIntegrationTest covered this; the new file's filter coverage is KTable-only.

# Please enter a commit message to explain why this merge is necessary,
# especially if it merges an updated upstream into a topic branch.
#
# Lines starting with '#' will be ignored, and an empty message aborts
# the commit.
@lucliu1108
Copy link
Copy Markdown
Member Author

Lucy Liu (lucliu1108) commented Apr 27, 2026

Alieh Saeedi (@aliehsaeedii) Thanks for the review
I added:

  • globalTable() shouldGlobalTableWithHeaders
  • KStream.toTable(Materialized) shouldStreamToTableWithHeaders
  • KStream.process() / processValues() shouldProcessWithHeader and shouldProcessValuesWithHeaders
  • KStream.cogroup() shouldKStreamCogroupWithHeaders, there's no KTable.cogroup() so i omitted.
  • Stateless KStream.filter shouldFilterAndFilterNotKStreamStatelessWithHeaders. The original test is in one of the PAPI PRs, it was reorged and renamed so i delete it here. seems like we don't need it as a standalone one anymore.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants