Skip to content

schema evolution test for kv store#4266

Draft
Lucy Liu (lucliu1108) wants to merge 8 commits into
confluentinc:masterfrom
lucliu1108:schema-evolution-test
Draft

schema evolution test for kv store#4266
Lucy Liu (lucliu1108) wants to merge 8 commits into
confluentinc:masterfrom
lucliu1108:schema-evolution-test

Conversation

@lucliu1108
Copy link
Copy Markdown
Member

What

Checklist

Please answer the questions with Y, N or N/A if not applicable.

  • [ ] Contains customer facing changes? Including API/behavior changes
  • [ ] Is this change gated behind config(s)?
    • List the config(s) needed to be set to enable this change
  • [ ] Did you add sufficient unit test and/or integration test coverage for this PR?
    • If not, please explain why it is not required
  • [ ] Does this change require modifying existing system tests or adding new system tests?
    • If so, include tracking information for the system test changes
  • [ ] Must this be released together with other change(s), either in this repo or another one?
    • If so, please include the link(s) to the changes that must be released together

References

JIRA:

Test & Review

Open questions / Follow-ups

Copilot AI review requested due to automatic review settings April 17, 2026 17:07
@confluent-cla-assistant
Copy link
Copy Markdown

🎉 All Contributor License Agreements have been signed. Ready to merge.
Please push an empty commit if you would like to re-run the checks to verify CLA status for all contributors.

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds an initial integration test suite to validate Avro schema evolution behavior when Kafka Streams materializes a KTable into a header-based KV state store (schema ID carried in record headers via HeaderSchemaIdSerializer).

Changes:

  • Introduces value schema evolution coverage (v1 → v2, add field with default) ensuring previously-written values remain readable.
  • Introduces key schema evolution coverage demonstrating that evolving the key schema results in distinct store entries (different key byte representations).
  • Adds shared test helpers to start a Streams KTable app with DSL_STORE_FORMAT_CONFIG=HEADERS, create topics, and wait for store population.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +47 to +49
import org.apache.kafka.common.utils.Bytes;
import java.util.stream.Collectors;
import org.apache.kafka.streams.KafkaStreams;
Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Import ordering is inconsistent: java.util.stream.Collectors is placed after org.apache.kafka.common.utils.Bytes. This can trip import-order checks (and differs from the ordering used in KafkaStreamsHeaderSchemaIdIntegrationTest). Reorder/group imports so all java.* imports come before org.* imports.

Copilot uses AI. Check for mistakes.
Comment on lines +424 to +426
} catch (Exception e) {
// Store may not be ready yet
}
Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

waitForStoreToContainKeys catches and ignores all exceptions. This can mask real failures (e.g., serialization/config errors) and turn them into a generic timeout. Catch only the expected transient exception for an unready store (e.g., InvalidStateStoreException) and either rethrow or record unexpected exceptions so the final assertion includes the root cause.

Copilot uses AI. Check for mistakes.
Comment on lines +324 to +329
builder.table(
inputTopic,
Consumed.with(createKeySerde(), createValueSerde()),
Materialized.<GenericRecord, GenericRecord, KeyValueStore<Bytes, byte[]>>as(STORE_NAME)
.withKeySerde(createKeySerde())
.withValueSerde(createValueSerde()));
Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

startTableApp creates new key/value serde instances multiple times (for Consumed and again for Materialized). This duplicates config and can create multiple schema-registry client instances unnecessarily. Consider constructing the key/value serdes once and reusing them for both Consumed.with(...) and Materialized.with*Serde(...).

Suggested change
builder.table(
inputTopic,
Consumed.with(createKeySerde(), createValueSerde()),
Materialized.<GenericRecord, GenericRecord, KeyValueStore<Bytes, byte[]>>as(STORE_NAME)
.withKeySerde(createKeySerde())
.withValueSerde(createValueSerde()));
GenericAvroSerde keySerde = createKeySerde();
GenericAvroSerde valueSerde = createValueSerde();
builder.table(
inputTopic,
Consumed.with(keySerde, valueSerde),
Materialized.<GenericRecord, GenericRecord, KeyValueStore<Bytes, byte[]>>as(STORE_NAME)
.withKeySerde(keySerde)
.withValueSerde(valueSerde));

Copilot uses AI. Check for mistakes.
@lucliu1108 Lucy Liu (lucliu1108) changed the title add initial schema evolution test for kv store schema evolution test for kv store Apr 17, 2026

// Lookup with v1 key STILL finds the old entry (it was not overwritten)
ValueAndTimestamp<GenericRecord> resultV1Again =
store.get(newRecord(KEY_SCHEMA_V1, "sensorId", "sensor-1"));
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not store.get(key)?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the whole newRecord(...) is just the key. I could rewrite that to a key variable to avoid creating the same key twice(one in producer, one during querying).

"v2 key lookup should return the value produced with v2 key");

// Lookup with v1 key STILL finds the old entry (it was not overwritten)
ValueAndTimestamp<GenericRecord> resultV1Again =
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why ValueAndTimestamp?

Copy link
Copy Markdown
Member Author

@lucliu1108 Lucy Liu (lucliu1108) Apr 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I switched it back to ValueTimestampHeaders type to align with other IQv1 tests.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants