Skip to content

SchemaId header store DSL test for TimestampedWindowStoreWithHeader#4251

Open
Lucy Liu (lucliu1108) wants to merge 41 commits into
confluentinc:masterfrom
lucliu1108:dsl-test-window-store
Open

SchemaId header store DSL test for TimestampedWindowStoreWithHeader#4251
Lucy Liu (lucliu1108) wants to merge 41 commits into
confluentinc:masterfrom
lucliu1108:dsl-test-window-store

Conversation

@lucliu1108
Copy link
Copy Markdown
Member

Summary

Add DSL-level integration tests for TimestampedWindowStoreWithHeaders to verify that schema ID headers are correctly propagated through windowed Kafka Streams operations backed by header-aware state stores.

Each test contains

  • parameterization across cache/uncached, with/without grace period senarios
  • IQv1 state store queries
  • Changelog header vadliation

Operations

  • Count (tumbling windows) — shouldCountWithTumblingWindows, with null handling
  • Aggregate (tumbling windows) — shouldAggregateWithTumblingWindows, with null handling
  • Reduce (tumbling windows) — shouldReduceWithTumblingWindows
  • Count (hopping windows) — shouldCountWithHoppingWindows
  • Stream-Stream Join — shouldStreamStreamsJoinWithHeaders
  • Suppress — shouldSuppressWithHeaders, with null handling

What

Checklist

Please answer the questions with Y, N or N/A if not applicable.

  • [ ] Contains customer facing changes? Including API/behavior changes
  • [ ] Is this change gated behind config(s)?
    • List the config(s) needed to be set to enable this change
  • [ ] Did you add sufficient unit test and/or integration test coverage for this PR?
    • If not, please explain why it is not required
  • [ ] Does this change require modifying existing system tests or adding new system tests?
    • If so, include tracking information for the system test changes
  • [ ] Must this be released together with other change(s), either in this repo or another one?
    • If so, please include the link(s) to the changes that must be released together

References

JIRA:

Test & Review

Open questions / Follow-ups

@lucliu1108 Lucy Liu (lucliu1108) requested a review from a team as a code owner April 14, 2026 17:15
Copilot AI review requested due to automatic review settings April 14, 2026 17:15
@confluent-cla-assistant
Copy link
Copy Markdown

🎉 All Contributor License Agreements have been signed. Ready to merge.
Please push an empty commit if you would like to re-run the checks to verify CLA status for all contributors.

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds DSL-level integration coverage to validate SchemaId header propagation through windowed/stateful Kafka Streams operations backed by header-aware timestamped window stores.

Changes:

  • Introduces a new parameterized DSL integration test suite for TimestampedWindowStoreWithHeaders, covering count/aggregate/reduce/hopping windows, stream-stream join, and suppression.
  • Removes the prior stateless header-schema-id integration test.
  • Updates the streams-integration-tests Maven module dependencies (Kafka artifacts + JUnit params).

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 9 comments.

File Description
streams-integration-tests/src/test/java/io/confluent/kafka/streams/integration/dsl/TimestampedWindowStoreWithHeadersDslIntegrationTest.java New DSL integration tests validating schema-id headers via IQv1 queries and changelog consumption across cache/grace combinations.
streams-integration-tests/src/test/java/io/confluent/kafka/streams/integration/KafkaStreamsHeaderSchemaIdIntegrationTest.java Removes the previous stateless filter-based header propagation integration test.
streams-integration-tests/pom.xml Adds JUnit params and modifies Kafka dependency versions / adds streams test utils.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread streams-integration-tests/pom.xml
Comment thread streams-integration-tests/pom.xml
Comment thread streams-integration-tests/pom.xml
Comment on lines +739 to +740
Thread.sleep(2000); // Wait for records to be buffered

Copy link

Copilot AI Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fixed sleep is used to wait for the join results, but fixed delays are a common source of flaky integration tests (too short under load, too long when fast). Prefer waiting on a condition (e.g., poll loop until 2 output records are observed or until a timeout) instead of Thread.sleep.

Copilot uses AI. Check for mistakes.
Comment on lines +745 to +752
List<ConsumerRecord<GenericRecord, GenericRecord>> outputRecords = new ArrayList<>();
try (KafkaConsumer<GenericRecord, GenericRecord> consumer = new KafkaConsumer<>(consumerProps, keySerde.deserializer(), aggSerde.deserializer())) {
consumer.subscribe(Collections.singletonList(outputTopic));
ConsumerRecords<GenericRecord, GenericRecord> records = consumer.poll(java.time.Duration.ofSeconds(10));
records.forEach(outputRecords::add);
}

assertEquals(2, outputRecords.size(), "Should have 2 join results (kafka and streams)");
Copy link

Copilot AI Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The join output consumption does a single poll(Duration.ofSeconds(10)) and immediately asserts on the number of records. This is prone to flakes (records can arrive after the first poll). Prefer polling in a loop until you collect the expected number of records (or a deadline) similar to other tests in the repo that consume N messages.

Suggested change
List<ConsumerRecord<GenericRecord, GenericRecord>> outputRecords = new ArrayList<>();
try (KafkaConsumer<GenericRecord, GenericRecord> consumer = new KafkaConsumer<>(consumerProps, keySerde.deserializer(), aggSerde.deserializer())) {
consumer.subscribe(Collections.singletonList(outputTopic));
ConsumerRecords<GenericRecord, GenericRecord> records = consumer.poll(java.time.Duration.ofSeconds(10));
records.forEach(outputRecords::add);
}
assertEquals(2, outputRecords.size(), "Should have 2 join results (kafka and streams)");
final int expectedOutputRecordCount = 2;
final Instant pollDeadline = Instant.now().plus(Duration.ofSeconds(10));
List<ConsumerRecord<GenericRecord, GenericRecord>> outputRecords = new ArrayList<>();
try (KafkaConsumer<GenericRecord, GenericRecord> consumer = new KafkaConsumer<>(consumerProps, keySerde.deserializer(), aggSerde.deserializer())) {
consumer.subscribe(Collections.singletonList(outputTopic));
while (outputRecords.size() < expectedOutputRecordCount && Instant.now().isBefore(pollDeadline)) {
ConsumerRecords<GenericRecord, GenericRecord> records = consumer.poll(Duration.ofMillis(500));
records.forEach(outputRecords::add);
}
}
assertEquals(expectedOutputRecordCount, outputRecords.size(), "Should have 2 join results (kafka and streams)");

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 3 out of 3 changed files in this pull request and generated 5 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread streams-integration-tests/pom.xml Outdated
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants