schema evolution test for kv store#4266
Conversation
|
🎉 All Contributor License Agreements have been signed. Ready to merge. |
There was a problem hiding this comment.
Pull request overview
Adds an initial integration test suite to validate Avro schema evolution behavior when Kafka Streams materializes a KTable into a header-based KV state store (schema ID carried in record headers via HeaderSchemaIdSerializer).
Changes:
- Introduces value schema evolution coverage (v1 → v2, add field with default) ensuring previously-written values remain readable.
- Introduces key schema evolution coverage demonstrating that evolving the key schema results in distinct store entries (different key byte representations).
- Adds shared test helpers to start a Streams KTable app with
DSL_STORE_FORMAT_CONFIG=HEADERS, create topics, and wait for store population.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| import org.apache.kafka.common.utils.Bytes; | ||
| import java.util.stream.Collectors; | ||
| import org.apache.kafka.streams.KafkaStreams; |
There was a problem hiding this comment.
Import ordering is inconsistent: java.util.stream.Collectors is placed after org.apache.kafka.common.utils.Bytes. This can trip import-order checks (and differs from the ordering used in KafkaStreamsHeaderSchemaIdIntegrationTest). Reorder/group imports so all java.* imports come before org.* imports.
| } catch (Exception e) { | ||
| // Store may not be ready yet | ||
| } |
There was a problem hiding this comment.
waitForStoreToContainKeys catches and ignores all exceptions. This can mask real failures (e.g., serialization/config errors) and turn them into a generic timeout. Catch only the expected transient exception for an unready store (e.g., InvalidStateStoreException) and either rethrow or record unexpected exceptions so the final assertion includes the root cause.
| builder.table( | ||
| inputTopic, | ||
| Consumed.with(createKeySerde(), createValueSerde()), | ||
| Materialized.<GenericRecord, GenericRecord, KeyValueStore<Bytes, byte[]>>as(STORE_NAME) | ||
| .withKeySerde(createKeySerde()) | ||
| .withValueSerde(createValueSerde())); |
There was a problem hiding this comment.
startTableApp creates new key/value serde instances multiple times (for Consumed and again for Materialized). This duplicates config and can create multiple schema-registry client instances unnecessarily. Consider constructing the key/value serdes once and reusing them for both Consumed.with(...) and Materialized.with*Serde(...).
| builder.table( | |
| inputTopic, | |
| Consumed.with(createKeySerde(), createValueSerde()), | |
| Materialized.<GenericRecord, GenericRecord, KeyValueStore<Bytes, byte[]>>as(STORE_NAME) | |
| .withKeySerde(createKeySerde()) | |
| .withValueSerde(createValueSerde())); | |
| GenericAvroSerde keySerde = createKeySerde(); | |
| GenericAvroSerde valueSerde = createValueSerde(); | |
| builder.table( | |
| inputTopic, | |
| Consumed.with(keySerde, valueSerde), | |
| Materialized.<GenericRecord, GenericRecord, KeyValueStore<Bytes, byte[]>>as(STORE_NAME) | |
| .withKeySerde(keySerde) | |
| .withValueSerde(valueSerde)); |
|
|
||
| // Lookup with v1 key STILL finds the old entry (it was not overwritten) | ||
| ValueAndTimestamp<GenericRecord> resultV1Again = | ||
| store.get(newRecord(KEY_SCHEMA_V1, "sensorId", "sensor-1")); |
There was a problem hiding this comment.
Why not store.get(key)?
There was a problem hiding this comment.
the whole newRecord(...) is just the key. I could rewrite that to a key variable to avoid creating the same key twice(one in producer, one during querying).
| "v2 key lookup should return the value produced with v2 key"); | ||
|
|
||
| // Lookup with v1 key STILL finds the old entry (it was not overwritten) | ||
| ValueAndTimestamp<GenericRecord> resultV1Again = |
There was a problem hiding this comment.
Why ValueAndTimestamp?
There was a problem hiding this comment.
I switched it back to ValueTimestampHeaders type to align with other IQv1 tests.
What
Checklist
Please answer the questions with Y, N or N/A if not applicable.
References
JIRA:
Test & Review
Open questions / Follow-ups