KStreams PAPI test for TimestampedKeyValueStoreWithHeaders and StatelessHeaderIntegrationTest#4211
Conversation
|
🎉 All Contributor License Agreements have been signed. Ready to merge. |
There was a problem hiding this comment.
Pull request overview
Adds new Kafka Streams integration coverage for header-based schema ID transport, including a comprehensive test suite for TimestampedKeyValueStoreWithHeaders, and renames/updates the existing stateless header transport integration test.
Changes:
- Introduce
TimestampedKeyValueStoreWithHeadersIntegrationTestcovering store ops + IQv1 queries + iterator/prefixScan behaviors with header-based schema IDs. - Rename
KafkaStreamsHeaderSchemaIdIntegrationTesttoStatelessHeaderIntegrationTestand factor header assertions into a helper. - Update
streams-integration-testsMaven dependencies to include Kafka Streams/Clients artifacts needed by the new tests.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 6 comments.
| File | Description |
|---|---|
| streams-integration-tests/src/test/java/io/confluent/kafka/streams/integration/TimestampedKeyValueStoreWithHeadersIntegrationTest.java | New end-to-end integration tests validating timestamped KV store behavior and header schema-id propagation (including changelog tombstones). |
| streams-integration-tests/src/test/java/io/confluent/kafka/streams/integration/StatelessHeaderIntegrationTest.java | Renames the stateless header-schema-id test and refactors repeated header assertions into a helper. |
| streams-integration-tests/pom.xml | Adds/adjusts Kafka dependencies for integration tests (notably Kafka Streams/Clients versions). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| <groupId>org.apache.kafka</groupId> | ||
| <artifactId>kafka-clients</artifactId> | ||
| <version>4.3.0-SNAPSHOT</version> | ||
| <scope>test</scope> | ||
| <exclusions> |
There was a problem hiding this comment.
Hardcoding Kafka dependencies to 4.3.0-SNAPSHOT makes builds non-reproducible and can break CI/environments without snapshot repositories. Prefer using the repo’s managed Kafka version (e.g., ${kafka.version}/dependencyManagement) instead of a SNAPSHOT here.
| <groupId>org.apache.kafka</groupId> | ||
| <artifactId>kafka-streams</artifactId> | ||
| <version>4.3.0-SNAPSHOT</version> | ||
| <scope>test</scope> |
There was a problem hiding this comment.
kafka-streams is also pinned to 4.3.0-SNAPSHOT here. If the intent is just to align with the rest of the build, use the managed Kafka version property instead of overriding it in this module to avoid dependency skew.
| <groupId>org.apache.kafka</groupId> | ||
| <artifactId>kafka-streams-test-utils</artifactId> | ||
| <version>4.3.0-SNAPSHOT</version> | ||
| <scope>test</scope> | ||
| </dependency> | ||
| <dependency> |
There was a problem hiding this comment.
The module code/tests don’t appear to reference kafka-streams-test-utils; if it’s not required transitively for ClusterTestHarness, consider removing it to keep the test classpath minimal.
| <groupId>org.apache.kafka</groupId> | |
| <artifactId>kafka-streams-test-utils</artifactId> | |
| <version>4.3.0-SNAPSHOT</version> | |
| <scope>test</scope> | |
| </dependency> | |
| <dependency> |
TengYao Chi (frankvicky)
left a comment
There was a problem hiding this comment.
Thanks for the PR
| assertTrue(tombstoneCount >= 4, | ||
| "Should have at least 4 tombstone records (PUT_NULL word-1, PUT_NULL word-99, PUT_NULL word-3, DELETE word-4)"); |
There was a problem hiding this comment.
Should we validate that which keys have been deleted?
27742fd to
b149be4
Compare
Alieh Saeedi (aliehsaeedii)
left a comment
There was a problem hiding this comment.
Code Review -- PR #4211
Overall Assessment
Well-structured PR with thorough PAPI test coverage for TimestampedKeyValueStoreWithHeaders. The operation-dispatch processor pattern, header assertion discipline, IQv1 verification, and changelog tombstone checks are all strong. Below are actionable items.
Issues (by priority)
[HIGH] 1. consumeRecords assertion allows silent test pass on timeout
TimestampedKeyValueStoreWithHeadersIntegrationTest.java, consumeRecords method:
assertTrue(results.size() <= expectedCount,
"Expected " + expectedCount + " records but got " + results.size());This asserts <= instead of ==. If the consumer times out with fewer records than expected, the test passes silently. The polling loop already caps at expectedCount, so <= is always true when the loop exits. Some call sites add their own assertEquals check afterward, but shouldVerifyIQv1Operations does not (see next item).
Fix: Change to assertEquals(expectedCount, results.size(), ...).
[HIGH] 2. shouldVerifyIQv1Operations does not verify consumed record count
The method calls consumeRecords(iqv1OutputTopic, "iqv1-consumer", 8) but never checks the returned list size. Combined with issue #1, the IQv1 store assertions could run against incomplete state if not all 8 records were processed.
Fix:
List<ConsumerRecord<...>> results = consumeRecords(iqv1OutputTopic, "iqv1-consumer", 8);
assertEquals(8, results.size(), "Should have 8 output records before IQv1 verification");[MEDIUM] 3. Duplicate kafka-clients dependency in pom.xml
The PR adds a new kafka-clients entry (test scope, with cc-custom-dns-resolver-java18 exclusion), but there is already an existing kafka-clients entry with test classifier below it. Please verify this is intentional and not a conflict.
[MEDIUM] 4. handlePutNull/handleDelete mutate incoming record headers in place
record.headers().remove(SchemaId.KEY_SCHEMA_ID_HEADER);
record.headers().remove(SchemaId.VALUE_SCHEMA_ID_HEADER);This mutates the Record's shared headers object. While it works here, it's a code smell. Consider either documenting the intent or creating a copy of headers.
[MEDIUM] 5. Changelog tombstone count should use assertEquals
In shouldDeleteWithNullValueAndGetNonExistent:
assertTrue(tombstoneCount >= 4, ...);There should be exactly 4 tombstones (word-1, word-99, word-3, word-4). Use assertEquals(4, tombstoneCount) for precision.
[MEDIUM] 6. Record count comment in shouldDeleteWithNullValueAndGetNonExistent is misleading
The comment // PUT(3) + GET(1) + ... + PUT_IF_ABSENT_NULL(1) + ... = 16 suggests PUT_IF_ABSENT_NULL always produces 1 output, but it produces 0 when the key is absent. The total (16) is correct; the breakdown is not. Consider updating the comment.
[LOW] 7. Missing newline at end of both Java files
Both StatelessHeaderIntegrationTest.java and TimestampedKeyValueStoreWithHeadersIntegrationTest.java are missing the final newline character.
[LOW] 8. Import ordering nit
java.util.stream.Collectors appears before java.util.concurrent.* imports in TimestampedKeyValueStoreWithHeadersIntegrationTest.java.
[LOW] 9. Stateless test coverage is minimal
StatelessHeaderIntegrationTest only tests filter. The rename from KafkaStreamsHeaderSchemaIdIntegrationTest implies broader coverage. Consider adding a TODO for map, flatMap, mapValues, selectKey, peek, branch, etc.
Per-Test Verdicts
| Test | Verdict | Key note |
|---|---|---|
shouldPerformAllStoreOperationsWithHeaders |
Pass | IQv1 query at end could verify store contents, not just assertNotNull |
shouldVerifyIQv1Operations |
Needs fix | Missing consumed record count assertion (issue #2) |
shouldTestReadOnlyAndIteratorOperationsWithHeaders |
Pass | PREFIX_SCAN byte-truncation approach is fragile -- add a comment explaining the assumption |
shouldTestStoreStateOperations |
Pass | Simple and correct |
shouldDeleteWithNullValueAndGetNonExistent |
Pass (minor fixes) | Tombstone count should use assertEquals; comment breakdown is misleading |
shouldFilterRecordsUsingHeaderBasedSchemaId |
Pass | Unchanged logic, clean refactoring |
PAPI Checklist
- Processors use correct store type (
TimestampedKeyValueStoreWithHeaders) - Headers forwarded via
context.forward()from storedValueTimestampHeaders - All core store operations covered (get, put, delete, putIfAbsent, putAll, range, all, reverse*, prefixScan)
- IQv1 queries verify headers in stored values
- Changelog tombstones carry key schema ID headers
-
punctuatenot tested (acceptable for this PR's scope) - Only
filtertested for stateless header propagation
|
Hi Alieh Saeedi (@aliehsaeedii) , thanks for the review!
|
Summary
This PR adds:
KafkaStreamsHeaderSchemaIdIntegrationTestWhat
Checklist
Please answer the questions with Y, N or N/A if not applicable.
References
JIRA:
Test & Review
Open questions / Follow-ups