Skip to content

KStreams PAPI test for TimestampedKeyValueStoreWithHeaders and StatelessHeaderIntegrationTest#4211

Open
Lucy Liu (lucliu1108) wants to merge 11 commits into
confluentinc:masterfrom
lucliu1108:timestamped-kv-store-header-test
Open

KStreams PAPI test for TimestampedKeyValueStoreWithHeaders and StatelessHeaderIntegrationTest#4211
Lucy Liu (lucliu1108) wants to merge 11 commits into
confluentinc:masterfrom
lucliu1108:timestamped-kv-store-header-test

Conversation

@lucliu1108
Copy link
Copy Markdown
Member

Summary

This PR adds:

  1. StatelessHeaderIntegrationTest, rename from the previous KafkaStreamsHeaderSchemaIdIntegrationTest
  2. TimestampedKeyValueStoreWithHeaders, which did PAPI test on timestampedKeyValueStoreWithHeaders, and validate store operations in the processor.

What

Checklist

Please answer the questions with Y, N or N/A if not applicable.

  • [ ] Contains customer facing changes? Including API/behavior changes
  • [ ] Is this change gated behind config(s)?
    • List the config(s) needed to be set to enable this change
  • [ ] Did you add sufficient unit test and/or integration test coverage for this PR?
    • If not, please explain why it is not required
  • [ ] Does this change require modifying existing system tests or adding new system tests?
    • If so, include tracking information for the system test changes
  • [ ] Must this be released together with other change(s), either in this repo or another one?
    • If so, please include the link(s) to the changes that must be released together

References

JIRA:

Test & Review

Open questions / Follow-ups

@lucliu1108 Lucy Liu (lucliu1108) requested a review from a team as a code owner March 20, 2026 14:53
Copilot AI review requested due to automatic review settings March 20, 2026 14:53
@confluent-cla-assistant
Copy link
Copy Markdown

🎉 All Contributor License Agreements have been signed. Ready to merge.
Please push an empty commit if you would like to re-run the checks to verify CLA status for all contributors.

@lucliu1108 Lucy Liu (lucliu1108) changed the title KStreams TimestampedKeyValueStoreWithHeaders and StatelessHeaderIntegrationTest KStreams PAPI test for TimestampedKeyValueStoreWithHeaders and StatelessHeaderIntegrationTest Mar 20, 2026
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds new Kafka Streams integration coverage for header-based schema ID transport, including a comprehensive test suite for TimestampedKeyValueStoreWithHeaders, and renames/updates the existing stateless header transport integration test.

Changes:

  • Introduce TimestampedKeyValueStoreWithHeadersIntegrationTest covering store ops + IQv1 queries + iterator/prefixScan behaviors with header-based schema IDs.
  • Rename KafkaStreamsHeaderSchemaIdIntegrationTest to StatelessHeaderIntegrationTest and factor header assertions into a helper.
  • Update streams-integration-tests Maven dependencies to include Kafka Streams/Clients artifacts needed by the new tests.

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 6 comments.

File Description
streams-integration-tests/src/test/java/io/confluent/kafka/streams/integration/TimestampedKeyValueStoreWithHeadersIntegrationTest.java New end-to-end integration tests validating timestamped KV store behavior and header schema-id propagation (including changelog tombstones).
streams-integration-tests/src/test/java/io/confluent/kafka/streams/integration/StatelessHeaderIntegrationTest.java Renames the stateless header-schema-id test and refactors repeated header assertions into a helper.
streams-integration-tests/pom.xml Adds/adjusts Kafka dependencies for integration tests (notably Kafka Streams/Clients versions).

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 69 to +73
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>4.3.0-SNAPSHOT</version>
<scope>test</scope>
<exclusions>
Copy link

Copilot AI Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hardcoding Kafka dependencies to 4.3.0-SNAPSHOT makes builds non-reproducible and can break CI/environments without snapshot repositories. Prefer using the repo’s managed Kafka version (e.g., ${kafka.version}/dependencyManagement) instead of a SNAPSHOT here.

Copilot uses AI. Check for mistakes.
Comment on lines 94 to 97
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>4.3.0-SNAPSHOT</version>
<scope>test</scope>
Copy link

Copilot AI Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kafka-streams is also pinned to 4.3.0-SNAPSHOT here. If the intent is just to align with the rest of the build, use the managed Kafka version property instead of overriding it in this module to avoid dependency skew.

Copilot uses AI. Check for mistakes.
Comment on lines +106 to 111
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams-test-utils</artifactId>
<version>4.3.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
Copy link

Copilot AI Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The module code/tests don’t appear to reference kafka-streams-test-utils; if it’s not required transitively for ClusterTestHarness, consider removing it to keep the test classpath minimal.

Suggested change
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams-test-utils</artifactId>
<version>4.3.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Member

@frankvicky TengYao Chi (frankvicky) left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR

Comment on lines +795 to +796
assertTrue(tombstoneCount >= 4,
"Should have at least 4 tombstone records (PUT_NULL word-1, PUT_NULL word-99, PUT_NULL word-3, DELETE word-4)");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we validate that which keys have been deleted?

@lucliu1108 Lucy Liu (lucliu1108) force-pushed the timestamped-kv-store-header-test branch from 27742fd to b149be4 Compare April 14, 2026 19:35
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review -- PR #4211

Overall Assessment

Well-structured PR with thorough PAPI test coverage for TimestampedKeyValueStoreWithHeaders. The operation-dispatch processor pattern, header assertion discipline, IQv1 verification, and changelog tombstone checks are all strong. Below are actionable items.


Issues (by priority)

[HIGH] 1. consumeRecords assertion allows silent test pass on timeout

TimestampedKeyValueStoreWithHeadersIntegrationTest.java, consumeRecords method:

assertTrue(results.size() <= expectedCount,
    "Expected " + expectedCount + " records but got " + results.size());

This asserts <= instead of ==. If the consumer times out with fewer records than expected, the test passes silently. The polling loop already caps at expectedCount, so <= is always true when the loop exits. Some call sites add their own assertEquals check afterward, but shouldVerifyIQv1Operations does not (see next item).

Fix: Change to assertEquals(expectedCount, results.size(), ...).


[HIGH] 2. shouldVerifyIQv1Operations does not verify consumed record count

The method calls consumeRecords(iqv1OutputTopic, "iqv1-consumer", 8) but never checks the returned list size. Combined with issue #1, the IQv1 store assertions could run against incomplete state if not all 8 records were processed.

Fix:

List<ConsumerRecord<...>> results = consumeRecords(iqv1OutputTopic, "iqv1-consumer", 8);
assertEquals(8, results.size(), "Should have 8 output records before IQv1 verification");

[MEDIUM] 3. Duplicate kafka-clients dependency in pom.xml

The PR adds a new kafka-clients entry (test scope, with cc-custom-dns-resolver-java18 exclusion), but there is already an existing kafka-clients entry with test classifier below it. Please verify this is intentional and not a conflict.


[MEDIUM] 4. handlePutNull/handleDelete mutate incoming record headers in place

record.headers().remove(SchemaId.KEY_SCHEMA_ID_HEADER);
record.headers().remove(SchemaId.VALUE_SCHEMA_ID_HEADER);

This mutates the Record's shared headers object. While it works here, it's a code smell. Consider either documenting the intent or creating a copy of headers.


[MEDIUM] 5. Changelog tombstone count should use assertEquals

In shouldDeleteWithNullValueAndGetNonExistent:

assertTrue(tombstoneCount >= 4, ...);

There should be exactly 4 tombstones (word-1, word-99, word-3, word-4). Use assertEquals(4, tombstoneCount) for precision.


[MEDIUM] 6. Record count comment in shouldDeleteWithNullValueAndGetNonExistent is misleading

The comment // PUT(3) + GET(1) + ... + PUT_IF_ABSENT_NULL(1) + ... = 16 suggests PUT_IF_ABSENT_NULL always produces 1 output, but it produces 0 when the key is absent. The total (16) is correct; the breakdown is not. Consider updating the comment.


[LOW] 7. Missing newline at end of both Java files

Both StatelessHeaderIntegrationTest.java and TimestampedKeyValueStoreWithHeadersIntegrationTest.java are missing the final newline character.


[LOW] 8. Import ordering nit

java.util.stream.Collectors appears before java.util.concurrent.* imports in TimestampedKeyValueStoreWithHeadersIntegrationTest.java.


[LOW] 9. Stateless test coverage is minimal

StatelessHeaderIntegrationTest only tests filter. The rename from KafkaStreamsHeaderSchemaIdIntegrationTest implies broader coverage. Consider adding a TODO for map, flatMap, mapValues, selectKey, peek, branch, etc.


Per-Test Verdicts

Test Verdict Key note
shouldPerformAllStoreOperationsWithHeaders Pass IQv1 query at end could verify store contents, not just assertNotNull
shouldVerifyIQv1Operations Needs fix Missing consumed record count assertion (issue #2)
shouldTestReadOnlyAndIteratorOperationsWithHeaders Pass PREFIX_SCAN byte-truncation approach is fragile -- add a comment explaining the assumption
shouldTestStoreStateOperations Pass Simple and correct
shouldDeleteWithNullValueAndGetNonExistent Pass (minor fixes) Tombstone count should use assertEquals; comment breakdown is misleading
shouldFilterRecordsUsingHeaderBasedSchemaId Pass Unchanged logic, clean refactoring

PAPI Checklist

  • Processors use correct store type (TimestampedKeyValueStoreWithHeaders)
  • Headers forwarded via context.forward() from stored ValueTimestampHeaders
  • All core store operations covered (get, put, delete, putIfAbsent, putAll, range, all, reverse*, prefixScan)
  • IQv1 queries verify headers in stored values
  • Changelog tombstones carry key schema ID headers
  • punctuate not tested (acceptable for this PR's scope)
  • Only filter tested for stateless header propagation

@lucliu1108
Copy link
Copy Markdown
Member Author

Lucy Liu (lucliu1108) commented Jun 1, 2026

Hi Alieh Saeedi (@aliehsaeedii) , thanks for the review!
I addressed the comments:

  • HIGH 1 — consumeRecords assertTrue(<=) allows silent timeout → Changed to assertEquals(expectedCount, results.size(), ...), also replaced the original consumerRecords method with a single generic <V> consumeRecords(topic, groupId, expectedCount, Class<?> valueDeserializerClass) that ends with assertEquals(expectedCount, results.size(), ...). The check now covers both regular and changelog calls (used to be two separate methods)
  • HIGH 2 — shouldVerifyIQv1Operations no record count check → Captured consumeRecords return value and added assertEquals(8, iqv1Results.size(), ...)
  • MEDIUM 3 — Duplicate kafka-clients dep in pom → Removed the no-classifier kafka-clients block; kept classifier=test; compile passes
  • MEDIUM 4 — handlePutNull/handleDelete mutate headers in place → Added a comment explaining that mutation is done in place here.
  • MEDIUM 5 — Tombstone assertTrue(>= 4) imprecise → Should be 6 tombstone
  • LOW 7 — Missing trailing newline → Added to both TimestampedKeyValueStoreWithHeadersIntegrationTest.java and StatelessHeaderIntegrationTest.java
  • LOW 8 — Import ordering: stream.Collectors before concurrent.* → Reordered to put concurrent.* first

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants