Skip to content

KStreams PAPI test for SessionStoreWithHeaders#4213

Open
Lucy Liu (lucliu1108) wants to merge 5 commits into
confluentinc:masterfrom
lucliu1108:session-store-with-header-test
Open

KStreams PAPI test for SessionStoreWithHeaders#4213
Lucy Liu (lucliu1108) wants to merge 5 commits into
confluentinc:masterfrom
lucliu1108:session-store-with-header-test

Conversation

@lucliu1108
Copy link
Copy Markdown
Member

Summary

This PR adds:

SessionStoreWithHeadersIntegrationTest, which did PAPI test on sessionStoreWithHeaders, and validate store operations in the processor.

What

Checklist

Please answer the questions with Y, N or N/A if not applicable.

  • [ ] Contains customer facing changes? Including API/behavior changes
  • [ ] Is this change gated behind config(s)?
    • List the config(s) needed to be set to enable this change
  • [ ] Did you add sufficient unit test and/or integration test coverage for this PR?
    • If not, please explain why it is not required
  • [ ] Does this change require modifying existing system tests or adding new system tests?
    • If so, include tracking information for the system test changes
  • [ ] Must this be released together with other change(s), either in this repo or another one?
    • If so, please include the link(s) to the changes that must be released together

References

JIRA:

Test & Review

Open questions / Follow-ups

@lucliu1108 Lucy Liu (lucliu1108) requested a review from a team as a code owner March 20, 2026 15:19
Copilot AI review requested due to automatic review settings March 20, 2026 15:19
@confluent-cla-assistant
Copy link
Copy Markdown

🎉 All Contributor License Agreements have been signed. Ready to merge.
Please push an empty commit if you would like to re-run the checks to verify CLA status for all contributors.

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a Kafka Streams integration test suite to exercise SessionStoreWithHeaders behavior when using header-based Schema ID transport, including processor-driven store operations, IQv1 querying, and changelog tombstone header validation.

Changes:

  • Introduces SessionStoreWithHeadersIntegrationTest covering many session-store operations and verifying schema-id headers on produced results.
  • Adds an IQv1 test path using a custom QueryableStoreType for SessionStoreWithHeaders.
  • Updates streams-integration-tests Maven dependencies (Kafka clients/streams and test-utils).

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 9 comments.

File Description
streams-integration-tests/src/test/java/io/confluent/kafka/streams/integration/SessionStoreWithHeadersIntegrationTest.java New integration tests validating SessionStoreWithHeaders operations and header-based schema-id behavior (including IQv1 + changelog tombstones).
streams-integration-tests/pom.xml Adjusts Kafka-related test dependencies/versions for the integration-tests module.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +589 to +596
// Consume outputs: 5 PUTs + 1 REMOVE = 6 records
consumeRecords(iqv1OutputTopic, "iqv1-session-consumer", 6);

// Query store via IQv1
ReadOnlySessionStore<GenericRecord, AggregationWithHeaders<GenericRecord>> store =
streams.store(
StoreQueryParameters.fromNameAndType(iqv1StoreName, new SessionStoreWithHeadersType<>()));
assertNotNull(store, "Store should be accessible via IQv1");
Copy link

Copilot AI Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In shouldVerifyIQv1Operations, the call to consumeRecords(..., 6) ignores the returned list and does not assert that the expected 6 output records were actually produced/consumed before querying the store. If the poll loop times out early, the subsequent IQv1 queries can run before all inputs are processed, making the test flaky. Consider asserting the consumed record count (or otherwise waiting for processing) before performing the store queries.

Copilot uses AI. Check for mistakes.
Comment on lines +1587 to +1590
Windowed<GenericRecord> sessionKey = new Windowed<>(
record.key(),
new org.apache.kafka.streams.kstream.internals.SessionWindow(record.timestamp(), record.timestamp())
);
Copy link

Copilot AI Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another construction of a session window via the ...internals.SessionWindow class. Consider reusing the same helper for building Windowed<GenericRecord> keys so the internal dependency is localized and easier to update if Kafka changes the internal type.

Copilot uses AI. Check for mistakes.
Comment on lines +106 to 111
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams-test-utils</artifactId>
<version>4.3.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
Copy link

Copilot AI Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kafka-streams-test-utils is added as a direct dependency, but it doesn't appear to be used anywhere under streams-integration-tests/src/test/java (no imports/usages found). If it’s not required, consider removing it to keep the test classpath minimal; if it is required transitively, adding a comment explaining why would help future maintenance.

Suggested change
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams-test-utils</artifactId>
<version>4.3.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>

Copilot uses AI. Check for mistakes.
Comment on lines 68 to +110
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>4.3.0-SNAPSHOT</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>io.confluent.networking</groupId>
<artifactId>cc-custom-dns-resolver-java18</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>4.3.0-SNAPSHOT</version>
<classifier>test</classifier>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>io.confluent.networking</groupId>
<artifactId>cc-custom-dns-resolver-java18</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>4.3.0-SNAPSHOT</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>io.confluent.networking</groupId>
<artifactId>cc-custom-dns-resolver-java18</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams-test-utils</artifactId>
<version>4.3.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
Copy link

Copilot AI Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The module now pins kafka-clients, kafka-streams, and kafka-streams-test-utils to 4.3.0-SNAPSHOT, while other Kafka artifacts in the same POM (eg kafka-server-common and kafka_${kafka.scala.version}) still come from the parent-managed ${kafka.version}. Mixing Kafka versions in a single test classpath is very likely to cause dependency convergence / linkage issues at runtime. Consider using the same ${kafka.version} for all Kafka artifacts (ideally via dependencyManagement) and avoid hardcoding a SNAPSHOT here unless the build is explicitly set up to resolve it and all Kafka deps are aligned.

Copilot uses AI. Check for mistakes.
}
}

private List<ConsumerRecord<GenericRecord, GenericRecord>> consumeRecords(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a check when received count > expected count?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Edit: added the check after poll loop

@lucliu1108
Copy link
Copy Markdown
Member Author

Hi Alieh Saeedi (@aliehsaeedii)
I applied the fixes suggested in #4204

  • Must-Fix 1 (session ordering bug) — assertRangeResults no longer uses a Map; now compares index-by-index (expectedPairs[i] vs results.get(startIdx + i)), so backward-fetch tests can't pass with forward-ordered results
  • Must-Fix 3 (consumeRecords silent timeout) — Unified into single generic <V> consumeRecords(topic, groupId, expectedCount, Class<?> valueDeserializerClass) with strict assertEquals(expectedCount, results.size(), ...). Now covers regular and changelog calls
  • Should-Fix 4 (duplicate lines) — Removed createConsumerProps, createChangelogConsumerProps, consumeChangelogRecords
  • Per-test 11 (shouldVerifyIQv1Operations no count check) — Captured iqv1Results and added assertEquals(6, iqv1Results.size(), ...)
  • Should-Fix 7 (trailing newline) — Added at EOF

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants