Skip to content

SchemaId header store DSL test for SessionStoreWIthHeaders#4250

Open
Lucy Liu (lucliu1108) wants to merge 36 commits into
confluentinc:masterfrom
lucliu1108:dsl-test-session-store
Open

SchemaId header store DSL test for SessionStoreWIthHeaders#4250
Lucy Liu (lucliu1108) wants to merge 36 commits into
confluentinc:masterfrom
lucliu1108:dsl-test-session-store

Conversation

@lucliu1108
Copy link
Copy Markdown
Member

Summary

This PR includes integration tests for SessionStoreWithHeaders using schema registry. The operations tested include:

  • groupBy
  • count
  • reduce
  • aggregate
  • suppress

What

Checklist

Please answer the questions with Y, N or N/A if not applicable.

  • [ ] Contains customer facing changes? Including API/behavior changes
  • [ ] Is this change gated behind config(s)?
    • List the config(s) needed to be set to enable this change
  • [ ] Did you add sufficient unit test and/or integration test coverage for this PR?
    • If not, please explain why it is not required
  • [ ] Does this change require modifying existing system tests or adding new system tests?
    • If so, include tracking information for the system test changes
  • [ ] Must this be released together with other change(s), either in this repo or another one?
    • If so, please include the link(s) to the changes that must be released together

References

JIRA:

Test & Review

Open questions / Follow-ups

@lucliu1108 Lucy Liu (lucliu1108) requested a review from a team as a code owner April 14, 2026 15:11
Copilot AI review requested due to automatic review settings April 14, 2026 15:11
@confluent-cla-assistant
Copy link
Copy Markdown

🎉 All Contributor License Agreements have been signed. Ready to merge.
Please push an empty commit if you would like to re-run the checks to verify CLA status for all contributors.

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds a new Kafka Streams integration test suite to validate header-based Schema ID propagation when using SessionStoreWithHeaders materializations (covering count, reduce, aggregate, and suppress) against the embedded Schema Registry test harness, and removes the prior stateless header-schema-id integration test.

Changes:

  • Add SessionStoreWithHeadersDslIntegrationTest with parameterized runs across caching/grace combinations.
  • Remove KafkaStreamsHeaderSchemaIdIntegrationTest (stateless filter coverage).
  • Update streams-integration-tests test dependencies (Kafka + JUnit params).

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 8 comments.

File Description
streams-integration-tests/src/test/java/io/confluent/kafka/streams/integration/dsl/SessionStoreWithHeadersDslIntegrationTest.java New integration tests for SessionStoreWithHeaders verifying schema-id headers and changelog behavior across DSL ops.
streams-integration-tests/src/test/java/io/confluent/kafka/streams/integration/KafkaStreamsHeaderSchemaIdIntegrationTest.java Removes older stateless header-schema-id integration test.
streams-integration-tests/pom.xml Pins Kafka test deps to 4.4.0-SNAPSHOT and adds kafka-streams-test-utils + junit-jupiter-params.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +161 to +164
Thread.sleep(5000);

ReadOnlySessionStore<GenericRecord, AggregationWithHeaders<Long>> store = streams.store(
StoreQueryParameters.fromNameAndType(storeName, new SessionStoreWithHeadersType<>()));
Copy link

Copilot AI Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests rely on fixed Thread.sleep(...) delays to wait for processing (e.g., before querying the state store). This is prone to flakiness under CI load; please replace the sleeps with polling/waiting on the expected condition with a bounded timeout (for example using org.apache.kafka.test.TestUtils.waitForCondition from the kafka-streams-test-utils dependency).

Copilot uses AI. Check for mistakes.
Comment thread streams-integration-tests/pom.xml
Comment thread streams-integration-tests/pom.xml
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 3 out of 3 changed files in this pull request and generated 4 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +161 to +174
Thread.sleep(5000);

ReadOnlySessionStore<GenericRecord, AggregationWithHeaders<Long>> store = streams.store(
StoreQueryParameters.fromNameAndType(storeName, new SessionStoreWithHeadersType<>()));

try (KeyValueIterator<Windowed<GenericRecord>, AggregationWithHeaders<Long>> it = store.fetch(createKey("kafka"))) {
assertTrue(it.hasNext());
KeyValue<Windowed<GenericRecord>, AggregationWithHeaders<Long>> next = it.next();
assertEquals(2L, next.value.aggregation());
assertEquals(baseTime, next.key.window().start());
assertEquals(baseTime + 5000, next.key.window().end());
assertKeySchemaIdHeader(next.value.headers(), "Merged session");
assertFalse(it.hasNext());
}
Copy link

Copilot AI Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test relies on fixed Thread.sleep(...) delays to wait for processing, which is both slow (this runs 4× per parameter set) and can be flaky under CI load. Prefer polling-based waits (e.g., kafka-streams-test-utils TestUtils.waitForCondition, or a small loop that queries the store/output until the assertion becomes true with a timeout) so the test proceeds as soon as results are available.

Suggested change
Thread.sleep(5000);
ReadOnlySessionStore<GenericRecord, AggregationWithHeaders<Long>> store = streams.store(
StoreQueryParameters.fromNameAndType(storeName, new SessionStoreWithHeadersType<>()));
try (KeyValueIterator<Windowed<GenericRecord>, AggregationWithHeaders<Long>> it = store.fetch(createKey("kafka"))) {
assertTrue(it.hasNext());
KeyValue<Windowed<GenericRecord>, AggregationWithHeaders<Long>> next = it.next();
assertEquals(2L, next.value.aggregation());
assertEquals(baseTime, next.key.window().start());
assertEquals(baseTime + 5000, next.key.window().end());
assertKeySchemaIdHeader(next.value.headers(), "Merged session");
assertFalse(it.hasNext());
}
ReadOnlySessionStore<GenericRecord, AggregationWithHeaders<Long>> store = streams.store(
StoreQueryParameters.fromNameAndType(storeName, new SessionStoreWithHeadersType<>()));
final long waitDeadline = System.nanoTime() + Duration.ofSeconds(30).toNanos();
AssertionError lastAssertionError = null;
boolean mergedSessionVerified = false;
while (System.nanoTime() < waitDeadline && !mergedSessionVerified) {
try (KeyValueIterator<Windowed<GenericRecord>, AggregationWithHeaders<Long>> it = store.fetch(createKey("kafka"))) {
assertTrue(it.hasNext());
KeyValue<Windowed<GenericRecord>, AggregationWithHeaders<Long>> next = it.next();
assertEquals(2L, next.value.aggregation());
assertEquals(baseTime, next.key.window().start());
assertEquals(baseTime + 5000, next.key.window().end());
assertKeySchemaIdHeader(next.value.headers(), "Merged session");
assertFalse(it.hasNext());
mergedSessionVerified = true;
} catch (AssertionError e) {
lastAssertionError = e;
Thread.sleep(100);
}
}
if (!mergedSessionVerified) {
if (lastAssertionError != null) {
throw lastAssertionError;
}
assertTrue(false, "Timed out waiting for merged session to be materialized");
}

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants