KStreams PAPI test for SessionStoreWithHeaders#4213
KStreams PAPI test for SessionStoreWithHeaders#4213Lucy Liu (lucliu1108) wants to merge 5 commits into
Conversation
|
🎉 All Contributor License Agreements have been signed. Ready to merge. |
There was a problem hiding this comment.
Pull request overview
Adds a Kafka Streams integration test suite to exercise SessionStoreWithHeaders behavior when using header-based Schema ID transport, including processor-driven store operations, IQv1 querying, and changelog tombstone header validation.
Changes:
- Introduces
SessionStoreWithHeadersIntegrationTestcovering many session-store operations and verifying schema-id headers on produced results. - Adds an IQv1 test path using a custom
QueryableStoreTypeforSessionStoreWithHeaders. - Updates
streams-integration-testsMaven dependencies (Kafka clients/streams and test-utils).
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 9 comments.
| File | Description |
|---|---|
| streams-integration-tests/src/test/java/io/confluent/kafka/streams/integration/SessionStoreWithHeadersIntegrationTest.java | New integration tests validating SessionStoreWithHeaders operations and header-based schema-id behavior (including IQv1 + changelog tombstones). |
| streams-integration-tests/pom.xml | Adjusts Kafka-related test dependencies/versions for the integration-tests module. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // Consume outputs: 5 PUTs + 1 REMOVE = 6 records | ||
| consumeRecords(iqv1OutputTopic, "iqv1-session-consumer", 6); | ||
|
|
||
| // Query store via IQv1 | ||
| ReadOnlySessionStore<GenericRecord, AggregationWithHeaders<GenericRecord>> store = | ||
| streams.store( | ||
| StoreQueryParameters.fromNameAndType(iqv1StoreName, new SessionStoreWithHeadersType<>())); | ||
| assertNotNull(store, "Store should be accessible via IQv1"); |
There was a problem hiding this comment.
In shouldVerifyIQv1Operations, the call to consumeRecords(..., 6) ignores the returned list and does not assert that the expected 6 output records were actually produced/consumed before querying the store. If the poll loop times out early, the subsequent IQv1 queries can run before all inputs are processed, making the test flaky. Consider asserting the consumed record count (or otherwise waiting for processing) before performing the store queries.
| Windowed<GenericRecord> sessionKey = new Windowed<>( | ||
| record.key(), | ||
| new org.apache.kafka.streams.kstream.internals.SessionWindow(record.timestamp(), record.timestamp()) | ||
| ); |
There was a problem hiding this comment.
Another construction of a session window via the ...internals.SessionWindow class. Consider reusing the same helper for building Windowed<GenericRecord> keys so the internal dependency is localized and easier to update if Kafka changes the internal type.
| <groupId>org.apache.kafka</groupId> | ||
| <artifactId>kafka-streams-test-utils</artifactId> | ||
| <version>4.3.0-SNAPSHOT</version> | ||
| <scope>test</scope> | ||
| </dependency> | ||
| <dependency> |
There was a problem hiding this comment.
kafka-streams-test-utils is added as a direct dependency, but it doesn't appear to be used anywhere under streams-integration-tests/src/test/java (no imports/usages found). If it’s not required, consider removing it to keep the test classpath minimal; if it is required transitively, adding a comment explaining why would help future maintenance.
| <groupId>org.apache.kafka</groupId> | |
| <artifactId>kafka-streams-test-utils</artifactId> | |
| <version>4.3.0-SNAPSHOT</version> | |
| <scope>test</scope> | |
| </dependency> | |
| <dependency> |
| <dependency> | ||
| <groupId>org.apache.kafka</groupId> | ||
| <artifactId>kafka-clients</artifactId> | ||
| <version>4.3.0-SNAPSHOT</version> | ||
| <scope>test</scope> | ||
| <exclusions> | ||
| <exclusion> | ||
| <groupId>io.confluent.networking</groupId> | ||
| <artifactId>cc-custom-dns-resolver-java18</artifactId> | ||
| </exclusion> | ||
| </exclusions> | ||
| </dependency> | ||
| <dependency> | ||
| <groupId>org.apache.kafka</groupId> | ||
| <artifactId>kafka-clients</artifactId> | ||
| <version>4.3.0-SNAPSHOT</version> | ||
| <classifier>test</classifier> | ||
| <scope>test</scope> | ||
| <exclusions> | ||
| <exclusion> | ||
| <groupId>io.confluent.networking</groupId> | ||
| <artifactId>cc-custom-dns-resolver-java18</artifactId> | ||
| </exclusion> | ||
| </exclusions> | ||
| </dependency> | ||
| <dependency> | ||
| <groupId>org.apache.kafka</groupId> | ||
| <artifactId>kafka-streams</artifactId> | ||
| <version>4.3.0-SNAPSHOT</version> | ||
| <scope>test</scope> | ||
| <exclusions> | ||
| <exclusion> | ||
| <groupId>io.confluent.networking</groupId> | ||
| <artifactId>cc-custom-dns-resolver-java18</artifactId> | ||
| </exclusion> | ||
| </exclusions> | ||
| </dependency> | ||
| <dependency> | ||
| <groupId>org.apache.kafka</groupId> | ||
| <artifactId>kafka-streams-test-utils</artifactId> | ||
| <version>4.3.0-SNAPSHOT</version> | ||
| <scope>test</scope> | ||
| </dependency> |
There was a problem hiding this comment.
The module now pins kafka-clients, kafka-streams, and kafka-streams-test-utils to 4.3.0-SNAPSHOT, while other Kafka artifacts in the same POM (eg kafka-server-common and kafka_${kafka.scala.version}) still come from the parent-managed ${kafka.version}. Mixing Kafka versions in a single test classpath is very likely to cause dependency convergence / linkage issues at runtime. Consider using the same ${kafka.version} for all Kafka artifacts (ideally via dependencyManagement) and avoid hardcoding a SNAPSHOT here unless the build is explicitly set up to resolve it and all Kafka deps are aligned.
| } | ||
| } | ||
|
|
||
| private List<ConsumerRecord<GenericRecord, GenericRecord>> consumeRecords( |
There was a problem hiding this comment.
Should we add a check when received count > expected count?
There was a problem hiding this comment.
Edit: added the check after poll loop
f2e92c9 to
79a5c47
Compare
|
Hi Alieh Saeedi (@aliehsaeedii)
|
Summary
This PR adds:
SessionStoreWithHeadersIntegrationTest, which did PAPI test on sessionStoreWithHeaders, and validate store operations in the processor.
What
Checklist
Please answer the questions with Y, N or N/A if not applicable.
References
JIRA:
Test & Review
Open questions / Follow-ups