SchemaId header store DSL test for TimestampedWindowStoreWithHeader#4251
SchemaId header store DSL test for TimestampedWindowStoreWithHeader#4251Lucy Liu (lucliu1108) wants to merge 41 commits into
Conversation
|
🎉 All Contributor License Agreements have been signed. Ready to merge. |
There was a problem hiding this comment.
Pull request overview
Adds DSL-level integration coverage to validate SchemaId header propagation through windowed/stateful Kafka Streams operations backed by header-aware timestamped window stores.
Changes:
- Introduces a new parameterized DSL integration test suite for
TimestampedWindowStoreWithHeaders, covering count/aggregate/reduce/hopping windows, stream-stream join, and suppression. - Removes the prior stateless header-schema-id integration test.
- Updates the
streams-integration-testsMaven module dependencies (Kafka artifacts + JUnit params).
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 9 comments.
| File | Description |
|---|---|
| streams-integration-tests/src/test/java/io/confluent/kafka/streams/integration/dsl/TimestampedWindowStoreWithHeadersDslIntegrationTest.java | New DSL integration tests validating schema-id headers via IQv1 queries and changelog consumption across cache/grace combinations. |
| streams-integration-tests/src/test/java/io/confluent/kafka/streams/integration/KafkaStreamsHeaderSchemaIdIntegrationTest.java | Removes the previous stateless filter-based header propagation integration test. |
| streams-integration-tests/pom.xml | Adds JUnit params and modifies Kafka dependency versions / adds streams test utils. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| Thread.sleep(2000); // Wait for records to be buffered | ||
|
|
There was a problem hiding this comment.
This fixed sleep is used to wait for the join results, but fixed delays are a common source of flaky integration tests (too short under load, too long when fast). Prefer waiting on a condition (e.g., poll loop until 2 output records are observed or until a timeout) instead of Thread.sleep.
| List<ConsumerRecord<GenericRecord, GenericRecord>> outputRecords = new ArrayList<>(); | ||
| try (KafkaConsumer<GenericRecord, GenericRecord> consumer = new KafkaConsumer<>(consumerProps, keySerde.deserializer(), aggSerde.deserializer())) { | ||
| consumer.subscribe(Collections.singletonList(outputTopic)); | ||
| ConsumerRecords<GenericRecord, GenericRecord> records = consumer.poll(java.time.Duration.ofSeconds(10)); | ||
| records.forEach(outputRecords::add); | ||
| } | ||
|
|
||
| assertEquals(2, outputRecords.size(), "Should have 2 join results (kafka and streams)"); |
There was a problem hiding this comment.
The join output consumption does a single poll(Duration.ofSeconds(10)) and immediately asserts on the number of records. This is prone to flakes (records can arrive after the first poll). Prefer polling in a loop until you collect the expected number of records (or a deadline) similar to other tests in the repo that consume N messages.
| List<ConsumerRecord<GenericRecord, GenericRecord>> outputRecords = new ArrayList<>(); | |
| try (KafkaConsumer<GenericRecord, GenericRecord> consumer = new KafkaConsumer<>(consumerProps, keySerde.deserializer(), aggSerde.deserializer())) { | |
| consumer.subscribe(Collections.singletonList(outputTopic)); | |
| ConsumerRecords<GenericRecord, GenericRecord> records = consumer.poll(java.time.Duration.ofSeconds(10)); | |
| records.forEach(outputRecords::add); | |
| } | |
| assertEquals(2, outputRecords.size(), "Should have 2 join results (kafka and streams)"); | |
| final int expectedOutputRecordCount = 2; | |
| final Instant pollDeadline = Instant.now().plus(Duration.ofSeconds(10)); | |
| List<ConsumerRecord<GenericRecord, GenericRecord>> outputRecords = new ArrayList<>(); | |
| try (KafkaConsumer<GenericRecord, GenericRecord> consumer = new KafkaConsumer<>(consumerProps, keySerde.deserializer(), aggSerde.deserializer())) { | |
| consumer.subscribe(Collections.singletonList(outputTopic)); | |
| while (outputRecords.size() < expectedOutputRecordCount && Instant.now().isBefore(pollDeadline)) { | |
| ConsumerRecords<GenericRecord, GenericRecord> records = consumer.poll(Duration.ofMillis(500)); | |
| records.forEach(outputRecords::add); | |
| } | |
| } | |
| assertEquals(expectedOutputRecordCount, outputRecords.size(), "Should have 2 join results (kafka and streams)"); |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 3 out of 3 changed files in this pull request and generated 5 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Summary
Add DSL-level integration tests for TimestampedWindowStoreWithHeaders to verify that schema ID headers are correctly propagated through windowed Kafka Streams operations backed by header-aware state stores.
Each test contains
Operations
What
Checklist
Please answer the questions with Y, N or N/A if not applicable.
References
JIRA:
Test & Review
Open questions / Follow-ups