SchemaId header store DSL test for SessionStoreWIthHeaders#4250
SchemaId header store DSL test for SessionStoreWIthHeaders#4250Lucy Liu (lucliu1108) wants to merge 36 commits into
Conversation
|
🎉 All Contributor License Agreements have been signed. Ready to merge. |
There was a problem hiding this comment.
Pull request overview
This PR adds a new Kafka Streams integration test suite to validate header-based Schema ID propagation when using SessionStoreWithHeaders materializations (covering count, reduce, aggregate, and suppress) against the embedded Schema Registry test harness, and removes the prior stateless header-schema-id integration test.
Changes:
- Add
SessionStoreWithHeadersDslIntegrationTestwith parameterized runs across caching/grace combinations. - Remove
KafkaStreamsHeaderSchemaIdIntegrationTest(stateless filter coverage). - Update
streams-integration-teststest dependencies (Kafka + JUnit params).
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 8 comments.
| File | Description |
|---|---|
streams-integration-tests/src/test/java/io/confluent/kafka/streams/integration/dsl/SessionStoreWithHeadersDslIntegrationTest.java |
New integration tests for SessionStoreWithHeaders verifying schema-id headers and changelog behavior across DSL ops. |
streams-integration-tests/src/test/java/io/confluent/kafka/streams/integration/KafkaStreamsHeaderSchemaIdIntegrationTest.java |
Removes older stateless header-schema-id integration test. |
streams-integration-tests/pom.xml |
Pins Kafka test deps to 4.4.0-SNAPSHOT and adds kafka-streams-test-utils + junit-jupiter-params. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| Thread.sleep(5000); | ||
|
|
||
| ReadOnlySessionStore<GenericRecord, AggregationWithHeaders<Long>> store = streams.store( | ||
| StoreQueryParameters.fromNameAndType(storeName, new SessionStoreWithHeadersType<>())); |
There was a problem hiding this comment.
These tests rely on fixed Thread.sleep(...) delays to wait for processing (e.g., before querying the state store). This is prone to flakiness under CI load; please replace the sleeps with polling/waiting on the expected condition with a bounded timeout (for example using org.apache.kafka.test.TestUtils.waitForCondition from the kafka-streams-test-utils dependency).
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 3 out of 3 changed files in this pull request and generated 4 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| Thread.sleep(5000); | ||
|
|
||
| ReadOnlySessionStore<GenericRecord, AggregationWithHeaders<Long>> store = streams.store( | ||
| StoreQueryParameters.fromNameAndType(storeName, new SessionStoreWithHeadersType<>())); | ||
|
|
||
| try (KeyValueIterator<Windowed<GenericRecord>, AggregationWithHeaders<Long>> it = store.fetch(createKey("kafka"))) { | ||
| assertTrue(it.hasNext()); | ||
| KeyValue<Windowed<GenericRecord>, AggregationWithHeaders<Long>> next = it.next(); | ||
| assertEquals(2L, next.value.aggregation()); | ||
| assertEquals(baseTime, next.key.window().start()); | ||
| assertEquals(baseTime + 5000, next.key.window().end()); | ||
| assertKeySchemaIdHeader(next.value.headers(), "Merged session"); | ||
| assertFalse(it.hasNext()); | ||
| } |
There was a problem hiding this comment.
The test relies on fixed Thread.sleep(...) delays to wait for processing, which is both slow (this runs 4× per parameter set) and can be flaky under CI load. Prefer polling-based waits (e.g., kafka-streams-test-utils TestUtils.waitForCondition, or a small loop that queries the store/output until the assertion becomes true with a timeout) so the test proceeds as soon as results are available.
| Thread.sleep(5000); | |
| ReadOnlySessionStore<GenericRecord, AggregationWithHeaders<Long>> store = streams.store( | |
| StoreQueryParameters.fromNameAndType(storeName, new SessionStoreWithHeadersType<>())); | |
| try (KeyValueIterator<Windowed<GenericRecord>, AggregationWithHeaders<Long>> it = store.fetch(createKey("kafka"))) { | |
| assertTrue(it.hasNext()); | |
| KeyValue<Windowed<GenericRecord>, AggregationWithHeaders<Long>> next = it.next(); | |
| assertEquals(2L, next.value.aggregation()); | |
| assertEquals(baseTime, next.key.window().start()); | |
| assertEquals(baseTime + 5000, next.key.window().end()); | |
| assertKeySchemaIdHeader(next.value.headers(), "Merged session"); | |
| assertFalse(it.hasNext()); | |
| } | |
| ReadOnlySessionStore<GenericRecord, AggregationWithHeaders<Long>> store = streams.store( | |
| StoreQueryParameters.fromNameAndType(storeName, new SessionStoreWithHeadersType<>())); | |
| final long waitDeadline = System.nanoTime() + Duration.ofSeconds(30).toNanos(); | |
| AssertionError lastAssertionError = null; | |
| boolean mergedSessionVerified = false; | |
| while (System.nanoTime() < waitDeadline && !mergedSessionVerified) { | |
| try (KeyValueIterator<Windowed<GenericRecord>, AggregationWithHeaders<Long>> it = store.fetch(createKey("kafka"))) { | |
| assertTrue(it.hasNext()); | |
| KeyValue<Windowed<GenericRecord>, AggregationWithHeaders<Long>> next = it.next(); | |
| assertEquals(2L, next.value.aggregation()); | |
| assertEquals(baseTime, next.key.window().start()); | |
| assertEquals(baseTime + 5000, next.key.window().end()); | |
| assertKeySchemaIdHeader(next.value.headers(), "Merged session"); | |
| assertFalse(it.hasNext()); | |
| mergedSessionVerified = true; | |
| } catch (AssertionError e) { | |
| lastAssertionError = e; | |
| Thread.sleep(100); | |
| } | |
| } | |
| if (!mergedSessionVerified) { | |
| if (lastAssertionError != null) { | |
| throw lastAssertionError; | |
| } | |
| assertTrue(false, "Timed out waiting for merged session to be materialized"); | |
| } |
Summary
This PR includes integration tests for SessionStoreWithHeaders using schema registry. The operations tested include:
What
Checklist
Please answer the questions with Y, N or N/A if not applicable.
References
JIRA:
Test & Review
Open questions / Follow-ups