From d9b3275510e828721fa35c22dd7b45186c2e44a1 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Thu, 2 Apr 2026 22:30:41 -0700 Subject: [PATCH 1/6] MINOR: Fix StoreBuilderWrapper for header-stores StoreBuilderWrapper must recongnize header-stores correctly. Otherwise, we create changelog topics with incorrect topi configuration. --- .../internals/StoreBuilderWrapper.java | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreBuilderWrapper.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreBuilderWrapper.java index be915a8dfb637..418712188677d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreBuilderWrapper.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreBuilderWrapper.java @@ -19,7 +19,9 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.internals.SessionStoreBuilder; +import org.apache.kafka.streams.state.internals.SessionStoreBuilderWithHeaders; import org.apache.kafka.streams.state.internals.TimestampedWindowStoreBuilder; +import org.apache.kafka.streams.state.internals.TimestampedWindowStoreWithHeadersBuilder; import org.apache.kafka.streams.state.internals.VersionedKeyValueStoreBuilder; import org.apache.kafka.streams.state.internals.WindowStoreBuilder; @@ -69,11 +71,14 @@ public long retentionPeriod() { return ((WindowStoreBuilder) builder).retentionPeriod(); } else if (builder instanceof TimestampedWindowStoreBuilder) { return ((TimestampedWindowStoreBuilder) builder).retentionPeriod(); + } else if (builder instanceof TimestampedWindowStoreWithHeadersBuilder) { + return ((TimestampedWindowStoreWithHeadersBuilder) builder).retentionPeriod(); } else if (builder instanceof SessionStoreBuilder) { return ((SessionStoreBuilder) builder).retentionPeriod(); + } else if (builder instanceof SessionStoreBuilderWithHeaders) { + return ((SessionStoreBuilderWithHeaders) builder).retentionPeriod(); } else { - throw new IllegalStateException( - "retentionPeriod is not supported when not a window store"); + throw new IllegalStateException("retentionPeriod is not supported when not a window store"); } } @@ -82,8 +87,7 @@ public long historyRetention() { if (builder instanceof VersionedKeyValueStoreBuilder) { return ((VersionedKeyValueStoreBuilder) builder).historyRetention(); } else { - throw new IllegalStateException( - "historyRetention is not supported when not a versioned store"); + throw new IllegalStateException("historyRetention is not supported when not a versioned store"); } } @@ -105,8 +109,10 @@ public String storeName() { @Override public boolean isWindowStore() { return builder instanceof WindowStoreBuilder - || builder instanceof TimestampedWindowStoreBuilder - || builder instanceof SessionStoreBuilder; + || builder instanceof TimestampedWindowStoreBuilder + || builder instanceof TimestampedWindowStoreWithHeadersBuilder + || builder instanceof SessionStoreBuilder + || builder instanceof SessionStoreBuilderWithHeaders; } @Override From c24dcc3cbf8ea9aee5f54085427e498c61b06a7f Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Fri, 3 Apr 2026 09:30:00 -0700 Subject: [PATCH 2/6] random fixes --- .../internals/AbstractConfigurableStoreFactory.java | 8 ++++---- .../kafka/streams/state/internals/UtilsTest.java | 13 +++++++------ 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractConfigurableStoreFactory.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractConfigurableStoreFactory.java index 99c8c5a2bc66f..6ca5142994dda 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractConfigurableStoreFactory.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractConfigurableStoreFactory.java @@ -27,13 +27,13 @@ public abstract class AbstractConfigurableStoreFactory implements StoreFactory { private final Set connectedProcessorNames = new HashSet<>(); private DslStoreSuppliers dslStoreSuppliers; - private final DslStoreFormat defaultStoreDefaultFormat; + private final DslStoreFormat defaultDslStoreFormat; private DslStoreFormat dslStoreFormatOverwrite; public AbstractConfigurableStoreFactory(final DslStoreSuppliers initialStoreSuppliers, - final DslStoreFormat defaultStoreDefaultFormat) { + final DslStoreFormat defaultDslStoreFormat) { this.dslStoreSuppliers = initialStoreSuppliers; - this.defaultStoreDefaultFormat = defaultStoreDefaultFormat; + this.defaultDslStoreFormat = defaultDslStoreFormat; } @Override @@ -57,7 +57,7 @@ public Set connectedProcessorNames() { } public DslStoreFormat dslStoreFormat() { - return dslStoreFormatOverwrite == null ? defaultStoreDefaultFormat : dslStoreFormatOverwrite; + return dslStoreFormatOverwrite == null ? defaultDslStoreFormat : dslStoreFormatOverwrite; } protected DslStoreSuppliers dslStoreSuppliers() { diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/UtilsTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/UtilsTest.java index 51732bd82833c..1d08e89899659 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/UtilsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/UtilsTest.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.internals.ByteUtils; import org.apache.kafka.streams.state.StateSerdes; import org.apache.kafka.streams.state.ValueAndTimestamp; @@ -55,7 +56,7 @@ public class UtilsTest { // Header size's varint encoding cannot exceed 5 bytes (see @{link ByteUtils#readVarint(ByteBuffer)}) private static final int MAX_VARINT_SIZE = 5; private static final int OVERFLOW_HEADERS_SIZE = (1 + MAX_VARINT_SIZE) + HEADERS.length + StateSerdes.TIMESTAMP_SIZE + VALUE.length; - // 1 byte header size, 0 byte empty headers, and timetsamp + // 1 byte header size, 0 byte empty headers, and timestamp private static final int MIN_SIZE = 1 + 0 + StateSerdes.TIMESTAMP_SIZE; @Test @@ -132,7 +133,7 @@ public void testRawTimestampedValueWithSerdes() { final Headers headers = new RecordHeaders().add("key1", "value1".getBytes(StandardCharsets.UTF_8)); final ValueTimestampHeaders input = ValueTimestampHeaders.make(VALUE_STR, TIMESTAMP, headers); try ( - final ValueTimestampHeadersSerializer serializer = new ValueTimestampHeadersSerializer<>(Serdes.String().serializer()); + final ValueTimestampHeadersSerializer serializer = new ValueTimestampHeadersSerializer<>(new StringSerializer()); final ValueAndTimestampSerde stringSerde = new ValueAndTimestampSerde<>(Serdes.String()) ) { final byte[] inputBytes = serializer.serialize(TOPIC, input); @@ -157,9 +158,9 @@ public void testEmptyHeadersAndTimestamp() { @ParameterizedTest @ValueSource(bytes = { 0x10, 0x11 }) - public void testEmptyHeadersAndTimestampWithInvalidHeaderSizes(final int invalidSize) { + public void testEmptyHeadersAndTimestampWithInvalidHeaderSizes(final byte invalidSize) { final byte[] invalid = new byte[MIN_SIZE]; - invalid[0] = (byte) invalidSize; // header size + invalid[0] = invalidSize; // header size assertFalse(hasEmptyHeaders(invalid)); } @@ -186,12 +187,12 @@ public void testReadBytes() { } private static byte[] timestampedValueWithEmptyHeaders(final byte[] value) { - // header size: 1 byte, emtpy headers: 0 byte, timestamp: 8 bytes, plain value length + // header size: 1 byte, empty headers: 0 byte, timestamp: 8 bytes, plain value length final byte[] res = new byte[1 + 0 + StateSerdes.TIMESTAMP_SIZE + value.length]; final ByteBuffer buf = ByteBuffer.wrap(res); buf.put((byte) 0x00); // header size buf.putLong(TIMESTAMP); - buf.put(VALUE); + buf.put(value); return res; } From d3678b98a3dde39918bf5d971e7b12d8a01b1e89 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Fri, 3 Apr 2026 09:40:08 -0700 Subject: [PATCH 3/6] fix incorrect test setup --- .../integration/TimestampedWindowStoreWithHeadersTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimestampedWindowStoreWithHeadersTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimestampedWindowStoreWithHeadersTest.java index 1ae592e8901cb..5ca2fb29e2744 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimestampedWindowStoreWithHeadersTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimestampedWindowStoreWithHeadersTest.java @@ -199,7 +199,7 @@ public void shouldSetChangelogTopicProperties() throws Exception { // verify changelog topic properties final String changelogTopic = props.getProperty(StreamsConfig.APPLICATION_ID_CONFIG) + "-" + STORE_NAME + "-changelog"; final Properties changelogTopicConfig = CLUSTER.getLogConfig(changelogTopic); - assertEquals("compact", changelogTopicConfig.getProperty("cleanup.policy")); + assertEquals("compact,delete", changelogTopicConfig.getProperty("cleanup.policy")); } @Test From 900aa31b6c3ce47dfd117754e6cccce715f75737 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Fri, 3 Apr 2026 09:41:24 -0700 Subject: [PATCH 4/6] fix incorrect public method name --- .../integration/HeadersStoreUpgradeIntegrationTest.java | 6 +++--- .../kstream/internals/SessionStoreMaterializer.java | 2 +- .../streams/processor/internals/StoreBuilderWrapper.java | 8 ++++---- .../main/java/org/apache/kafka/streams/state/Stores.java | 6 +++--- ...thHeaders.java => SessionStoreWithHeadersBuilder.java} | 4 ++-- .../KStreamSessionWindowAggregateProcessorTest.java | 2 +- .../streams/state/QueryableStoreTypesWithHeadersTest.java | 2 +- .../SessionStoreWithHeadersSerializerSideEffectTest.java | 2 +- 8 files changed, 16 insertions(+), 16 deletions(-) rename streams/src/main/java/org/apache/kafka/streams/state/internals/{SessionStoreBuilderWithHeaders.java => SessionStoreWithHeadersBuilder.java} (98%) diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java index 25fb33dd18225..3d95a1683739a 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java @@ -1622,7 +1622,7 @@ private void shouldMigrateSessionStoreToSessionStoreWithHeaders(final boolean is final StreamsBuilder newBuilder = new StreamsBuilder(); final AtomicReference processorRef = new AtomicReference<>(); newBuilder.addStateStore( - Stores.sessionStoreBuilderWithHeaders( + Stores.sessionStoreWithHeadersBuilder( isPersistent ? Stores.persistentSessionStoreWithHeaders(SESSION_STORE_NAME, Duration.ofMillis(RETENTION_MS)) : Stores.inMemorySessionStore(SESSION_STORE_NAME, Duration.ofMillis(RETENTION_MS)), Serdes.String(), @@ -1680,7 +1680,7 @@ public void shouldProxySessionStoreToSessionStoreWithHeaders() throws Exception final StreamsBuilder newBuilder = new StreamsBuilder(); final AtomicReference processorRef = new AtomicReference<>(); newBuilder.addStateStore( - Stores.sessionStoreBuilderWithHeaders( + Stores.sessionStoreWithHeadersBuilder( Stores.persistentSessionStore(SESSION_STORE_NAME, Duration.ofMillis(RETENTION_MS)), // non-headers supplier! Serdes.String(), Serdes.String())) @@ -1941,7 +1941,7 @@ private void setupAndPopulateSessionStoreWithHeaders(final Properties props) thr final StreamsBuilder headersBuilder = new StreamsBuilder(); final AtomicReference processorRef = new AtomicReference<>(); headersBuilder.addStateStore( - Stores.sessionStoreBuilderWithHeaders( + Stores.sessionStoreWithHeadersBuilder( Stores.persistentSessionStoreWithHeaders(SESSION_STORE_NAME, Duration.ofMillis(RETENTION_MS)), Serdes.String(), Serdes.String())) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionStoreMaterializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionStoreMaterializer.java index d115708638261..31fa0a58c9fec 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionStoreMaterializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionStoreMaterializer.java @@ -70,7 +70,7 @@ public StoreBuilder builder() { final StoreBuilder builder; if (supplier instanceof HeadersBytesStoreSupplier) { - builder = Stores.sessionStoreBuilderWithHeaders( + builder = Stores.sessionStoreWithHeadersBuilder( supplier, materialized.keySerde(), materialized.valueSerde() diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreBuilderWrapper.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreBuilderWrapper.java index 418712188677d..9581f31270215 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreBuilderWrapper.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreBuilderWrapper.java @@ -19,7 +19,7 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.internals.SessionStoreBuilder; -import org.apache.kafka.streams.state.internals.SessionStoreBuilderWithHeaders; +import org.apache.kafka.streams.state.internals.SessionStoreWithHeadersBuilder; import org.apache.kafka.streams.state.internals.TimestampedWindowStoreBuilder; import org.apache.kafka.streams.state.internals.TimestampedWindowStoreWithHeadersBuilder; import org.apache.kafka.streams.state.internals.VersionedKeyValueStoreBuilder; @@ -75,8 +75,8 @@ public long retentionPeriod() { return ((TimestampedWindowStoreWithHeadersBuilder) builder).retentionPeriod(); } else if (builder instanceof SessionStoreBuilder) { return ((SessionStoreBuilder) builder).retentionPeriod(); - } else if (builder instanceof SessionStoreBuilderWithHeaders) { - return ((SessionStoreBuilderWithHeaders) builder).retentionPeriod(); + } else if (builder instanceof SessionStoreWithHeadersBuilder) { + return ((SessionStoreWithHeadersBuilder) builder).retentionPeriod(); } else { throw new IllegalStateException("retentionPeriod is not supported when not a window store"); } @@ -112,7 +112,7 @@ public boolean isWindowStore() { || builder instanceof TimestampedWindowStoreBuilder || builder instanceof TimestampedWindowStoreWithHeadersBuilder || builder instanceof SessionStoreBuilder - || builder instanceof SessionStoreBuilderWithHeaders; + || builder instanceof SessionStoreWithHeadersBuilder; } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java index 14c37ed93b437..4dcff31361232 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java @@ -31,7 +31,7 @@ import org.apache.kafka.streams.state.internals.RocksDbVersionedKeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.internals.RocksDbWindowBytesStoreSupplier; import org.apache.kafka.streams.state.internals.SessionStoreBuilder; -import org.apache.kafka.streams.state.internals.SessionStoreBuilderWithHeaders; +import org.apache.kafka.streams.state.internals.SessionStoreWithHeadersBuilder; import org.apache.kafka.streams.state.internals.TimestampedKeyValueStoreBuilder; import org.apache.kafka.streams.state.internals.TimestampedKeyValueStoreBuilderWithHeaders; import org.apache.kafka.streams.state.internals.TimestampedWindowStoreBuilder; @@ -658,12 +658,12 @@ public static StoreBuilder> sessionStoreBuilder(final * @param value type * @return an instance of {@link StoreBuilder} than can build a {@link SessionStoreWithHeaders} */ - public static StoreBuilder> sessionStoreBuilderWithHeaders( + public static StoreBuilder> sessionStoreWithHeadersBuilder( final SessionBytesStoreSupplier supplier, final Serde keySerde, final Serde valueSerde) { Objects.requireNonNull(supplier, "supplier cannot be null"); - return new SessionStoreBuilderWithHeaders<>(supplier, keySerde, valueSerde, Time.SYSTEM); + return new SessionStoreWithHeadersBuilder<>(supplier, keySerde, valueSerde, Time.SYSTEM); } /** diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderWithHeaders.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreWithHeadersBuilder.java similarity index 98% rename from streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderWithHeaders.java rename to streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreWithHeadersBuilder.java index d09886b3e0802..c6224976fa91d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderWithHeaders.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreWithHeadersBuilder.java @@ -37,12 +37,12 @@ * {@link AggregationWithHeaders} as the value wrapper and wires up the * header-aware store stack (change-logging, caching, metering). */ -public class SessionStoreBuilderWithHeaders +public class SessionStoreWithHeadersBuilder extends AbstractStoreBuilder, SessionStoreWithHeaders> { private final SessionBytesStoreSupplier storeSupplier; - public SessionStoreBuilderWithHeaders(final SessionBytesStoreSupplier storeSupplier, + public SessionStoreWithHeadersBuilder(final SessionBytesStoreSupplier storeSupplier, final Serde keySerde, final Serde valueSerde, final Time time) { diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java index 5c18ef5662c64..a43b56891062b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java @@ -154,7 +154,7 @@ private void initStore(final boolean enableCaching) { Stores.persistentSessionStore(STORE_NAME, ofMillis(GAP_MS * 3)); final StoreBuilder> storeBuilder = - Stores.sessionStoreBuilderWithHeaders(supplier, Serdes.String(), Serdes.Long()) + Stores.sessionStoreWithHeadersBuilder(supplier, Serdes.String(), Serdes.Long()) .withLoggingDisabled(); if (enableCaching && emitStrategy.type() != EmitStrategy.StrategyType.ON_WINDOW_CLOSE) { diff --git a/streams/src/test/java/org/apache/kafka/streams/state/QueryableStoreTypesWithHeadersTest.java b/streams/src/test/java/org/apache/kafka/streams/state/QueryableStoreTypesWithHeadersTest.java index e361daae09de1..eafe2c5a39c5a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/QueryableStoreTypesWithHeadersTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/QueryableStoreTypesWithHeadersTest.java @@ -200,7 +200,7 @@ public void shouldAcceptRegularWindowStoreForWindowStoreType() { @Test public void shouldAcceptSessionStoreWithHeadersForSessionStoreType() { final SessionStoreWithHeaders store = - Stores.sessionStoreBuilderWithHeaders( + Stores.sessionStoreWithHeadersBuilder( Stores.inMemorySessionStore( "test-session-store", Duration.ofMillis(100)), diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreWithHeadersSerializerSideEffectTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreWithHeadersSerializerSideEffectTest.java index b5eda458c78fd..500fb9a7cfe3f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreWithHeadersSerializerSideEffectTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreWithHeadersSerializerSideEffectTest.java @@ -135,7 +135,7 @@ public void shouldPropagateSerializerHeaderSideEffectToChangelog() { // Create a session store with headers using our custom serializer builder.addStateStore( - Stores.sessionStoreBuilderWithHeaders( + Stores.sessionStoreWithHeadersBuilder( Stores.inMemorySessionStore( STORE_NAME, Duration.ofMillis(10000L) From 9b3c0f382bada5ea02662df6ebf8424c02a91494 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Fri, 3 Apr 2026 10:12:13 -0700 Subject: [PATCH 5/6] add new test --- .../SessionsStoreWithHeadersTest.java | 531 ++++++++++++++++++ ...TimestampedWindowStoreWithHeadersTest.java | 8 +- 2 files changed, 535 insertions(+), 4 deletions(-) create mode 100644 streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SessionsStoreWithHeadersTest.java diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SessionsStoreWithHeadersTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SessionsStoreWithHeadersTest.java new file mode 100644 index 0000000000000..bf18dce66e585 --- /dev/null +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SessionsStoreWithHeadersTest.java @@ -0,0 +1,531 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.internals.SessionWindow; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.state.AggregationWithHeaders; +import org.apache.kafka.streams.state.SessionStore; +import org.apache.kafka.streams.state.SessionStoreWithHeaders; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.test.TestUtils; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; + +import java.io.IOException; +import java.time.Duration; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Properties; + +import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; +import static org.junit.jupiter.api.Assertions.assertEquals; + +@Tag("integration") +public class SessionsStoreWithHeadersTest { + + private static final String STORE_NAME = "headers-session-store"; + private static final long WINDOW_GAP_MS = 100L; + private static final long RETENTION_MS = 1000L; + + private String inputStream; + private String outputStream; + private long baseTimestamp; + + private KafkaStreams kafkaStreams; + + private static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); + + private static final Headers HEADERS1 = new RecordHeaders() + .add("source", "test".getBytes()) + .add("version", "1.0".getBytes()); + + private static final Headers HEADERS2 = new RecordHeaders() + .add("source", "test".getBytes()) + .add("version", "2.0".getBytes()); + + private static final Headers EMPTY_HEADERS = new RecordHeaders(); + + public TestInfo testInfo; + + @BeforeAll + public static void before() throws IOException { + CLUSTER.start(); + } + + @AfterAll + public static void after() { + CLUSTER.stop(); + } + + @BeforeEach + public void beforeTest(final TestInfo testInfo) { + this.testInfo = testInfo; + final String uniqueTestName = safeUniqueTestName(testInfo); + inputStream = "input-stream-" + uniqueTestName; + outputStream = "output-stream-" + uniqueTestName; + CLUSTER.createTopic(inputStream); + CLUSTER.createTopic(outputStream); + + baseTimestamp = CLUSTER.time.milliseconds(); + } + + @AfterEach + public void afterTest() { + if (kafkaStreams != null) { + kafkaStreams.close(Duration.ofSeconds(30L)); + kafkaStreams.cleanUp(); + } + } + + @Test + public void shouldPutFetchAndDelete() throws Exception { + final StreamsBuilder streamsBuilder = new StreamsBuilder(); + + streamsBuilder.addStateStore( + Stores.sessionStoreWithHeadersBuilder( + Stores.persistentSessionStoreWithHeaders(STORE_NAME, Duration.ofMillis(RETENTION_MS)), + Serdes.Integer(), + Serdes.String() + ) + ) + .stream(inputStream, Consumed.with(Serdes.Integer(), Serdes.String())) + .process(() -> new SessionStoreWithHeadersContentCheckerProcessor(true), STORE_NAME) + .to(outputStream, Produced.with(Serdes.Integer(), Serdes.Integer())); + + final Properties props = props(); + kafkaStreams = new KafkaStreams(streamsBuilder.build(), props); + kafkaStreams.start(); + + // produce source data with headers + int numRecordsProduced = 0; + + // Window 1: [baseTimestamp, baseTimestamp + WINDOW_SIZE_MS) + numRecordsProduced += produceDataToTopicWithHeaders(inputStream, baseTimestamp, HEADERS1, + KeyValue.pair(1, "a0"), KeyValue.pair(2, "b0"), KeyValue.pair(3, null)); + + // Window 1: updates in same window + numRecordsProduced += produceDataToTopicWithHeaders(inputStream, baseTimestamp + 50, HEADERS2, + KeyValue.pair(1, "a50"), KeyValue.pair(2, null), KeyValue.pair(3, "c50")); + + // Window 2: [baseTimestamp + WINDOW_SIZE_MS, baseTimestamp + 2 * WINDOW_SIZE_MS) + numRecordsProduced += produceDataToTopicWithHeaders(inputStream, baseTimestamp + WINDOW_GAP_MS, + EMPTY_HEADERS, + KeyValue.pair(1, "a100"), KeyValue.pair(2, "b100"), KeyValue.pair(3, null)); + + final List> receivedRecords = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( + TestUtils.consumerConfig( + CLUSTER.bootstrapServers(), + IntegerDeserializer.class, + IntegerDeserializer.class), + outputStream, + numRecordsProduced); + + receivedRecords.forEach(receivedRecord -> assertEquals(0, receivedRecord.value)); + } + + @Test + public void shouldSetChangelogTopicProperties() throws Exception { + final StreamsBuilder streamsBuilder = new StreamsBuilder(); + + streamsBuilder.addStateStore( + Stores.sessionStoreWithHeadersBuilder( + Stores.persistentSessionStoreWithHeaders(STORE_NAME, Duration.ofMillis(RETENTION_MS)), + Serdes.Integer(), + Serdes.String() + ) + ) + .stream(inputStream, Consumed.with(Serdes.Integer(), Serdes.String())) + .process(() -> new SessionStoreWithHeadersContentCheckerProcessor(false), STORE_NAME) + .to(outputStream, Produced.with(Serdes.Integer(), Serdes.Integer())); + + final Properties props = props(); + kafkaStreams = new KafkaStreams(streamsBuilder.build(), props); + kafkaStreams.start(); + + produceDataToTopicWithHeaders(inputStream, baseTimestamp, new RecordHeaders(), KeyValue.pair(0, "foo")); + + IntegrationTestUtils.waitUntilMinRecordsReceived( + TestUtils.consumerConfig( + CLUSTER.bootstrapServers(), + IntegerDeserializer.class, + IntegerDeserializer.class), + outputStream, + 1); + + // verify changelog topic properties + final String changelogTopic = props.getProperty(StreamsConfig.APPLICATION_ID_CONFIG) + "-" + STORE_NAME + "-changelog"; + final Properties changelogTopicConfig = CLUSTER.getLogConfig(changelogTopic); + assertEquals("compact,delete", changelogTopicConfig.getProperty("cleanup.policy")); + } + + @Test + public void shouldRestore() throws Exception { + StreamsBuilder streamsBuilder = new StreamsBuilder(); + + streamsBuilder.addStateStore( + Stores.sessionStoreWithHeadersBuilder( + Stores.persistentSessionStoreWithHeaders(STORE_NAME, Duration.ofMillis(RETENTION_MS)), + Serdes.Integer(), + Serdes.String() + ) + ) + .stream(inputStream, Consumed.with(Serdes.Integer(), Serdes.String())) + .process(() -> new SessionStoreWithHeadersContentCheckerProcessor(true), STORE_NAME) + .to(outputStream, Produced.with(Serdes.Integer(), Serdes.Integer())); + + final Properties props = props(); + kafkaStreams = new KafkaStreams(streamsBuilder.build(), props); + kafkaStreams.start(); + + int initialRecordsProduced = 0; + + initialRecordsProduced += produceDataToTopicWithHeaders(inputStream, baseTimestamp, HEADERS1, + KeyValue.pair(1, "a0"), KeyValue.pair(2, "b0"), KeyValue.pair(3, null)); + + initialRecordsProduced += produceDataToTopicWithHeaders(inputStream, baseTimestamp + 50, HEADERS2, + KeyValue.pair(1, "a50"), KeyValue.pair(2, null), KeyValue.pair(3, "c50")); + + initialRecordsProduced += produceDataToTopicWithHeaders(inputStream, baseTimestamp + WINDOW_GAP_MS, EMPTY_HEADERS, + KeyValue.pair(1, "a100"), KeyValue.pair(2, "b100"), KeyValue.pair(3, "c100")); + + IntegrationTestUtils.waitUntilMinRecordsReceived( + TestUtils.consumerConfig( + CLUSTER.bootstrapServers(), + IntegerDeserializer.class, + IntegerDeserializer.class), + outputStream, + initialRecordsProduced); + + // wipe out state store to trigger restore process on restart + kafkaStreams.close(); + kafkaStreams.cleanUp(); + + // restart app - use processor WITHOUT validation of initial data, just write to store + streamsBuilder = new StreamsBuilder(); + + streamsBuilder.addStateStore( + Stores.sessionStoreWithHeadersBuilder( + Stores.persistentSessionStoreWithHeaders(STORE_NAME, Duration.ofMillis(RETENTION_MS)), + Serdes.Integer(), + Serdes.String() + ) + ) + .stream(inputStream, Consumed.with(Serdes.Integer(), Serdes.String())) + .process(() -> new SessionStoreWithHeadersContentCheckerProcessor(true), STORE_NAME) + .to(outputStream, Produced.with(Serdes.Integer(), Serdes.Integer())); + + kafkaStreams = new KafkaStreams(streamsBuilder.build(), props); + kafkaStreams.start(); + + // produce additional records to verify restored store works correctly + final Headers finalHeaders = new RecordHeaders().add("final", "true".getBytes()); + final int additionalRecordsProduced = produceDataToTopicWithHeaders(inputStream, baseTimestamp + 2 * WINDOW_GAP_MS, finalHeaders, + KeyValue.pair(1, "a200"), KeyValue.pair(2, "b200"), KeyValue.pair(3, "c200")); + + final List> receivedRecords = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( + TestUtils.consumerConfig( + CLUSTER.bootstrapServers(), + IntegerDeserializer.class, + IntegerDeserializer.class), + outputStream, + initialRecordsProduced + additionalRecordsProduced); + + receivedRecords.forEach(receivedRecord -> assertEquals(0, receivedRecord.value)); + } + + @Test + public void shouldManualUpgradeFromTimestampedToHeaders() throws Exception { + final StreamsBuilder streamsBuilder = new StreamsBuilder(); + + streamsBuilder.addStateStore( + Stores.sessionStoreBuilder( + Stores.persistentSessionStore(STORE_NAME, Duration.ofMillis(RETENTION_MS)), + Serdes.Integer(), + Serdes.String() + ) + ) + .stream(inputStream, Consumed.with(Serdes.Integer(), Serdes.String())) + .process(SessionStoreContentCheckerProcessor::new, STORE_NAME) + .to(outputStream, Produced.with(Serdes.Integer(), Serdes.Integer())); + + shouldManualUpgradeFromTimestampedToHeaders(streamsBuilder.build()); + } + + private void shouldManualUpgradeFromTimestampedToHeaders(final Topology originalTopology) throws Exception { + // build original timestamped (legacy) topology and start app + final Properties props = props(); + kafkaStreams = new KafkaStreams(originalTopology, props); + kafkaStreams.start(); + + // produce source data to legacy timestamped store (without headers) + int initialRecordsProduced = 0; + initialRecordsProduced += produceDataToTopic(inputStream, baseTimestamp, + KeyValue.pair(1, "a0"), KeyValue.pair(2, "b0"), KeyValue.pair(3, null)); + initialRecordsProduced += produceDataToTopic(inputStream, baseTimestamp + 50, + KeyValue.pair(1, "a50"), KeyValue.pair(2, null), KeyValue.pair(3, "c50")); + initialRecordsProduced += produceDataToTopic(inputStream, baseTimestamp + WINDOW_GAP_MS, + KeyValue.pair(1, "a100"), KeyValue.pair(2, "b100"), KeyValue.pair(3, null)); + + List> receivedRecords = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( + TestUtils.consumerConfig( + CLUSTER.bootstrapServers(), + IntegerDeserializer.class, + IntegerDeserializer.class), + outputStream, + initialRecordsProduced); + + receivedRecords.forEach(receivedRecord -> assertEquals(0, receivedRecord.value)); + + kafkaStreams.close(); + kafkaStreams.cleanUp(); + + // restart app with headers-aware store to test upgrade path + // The store should migrate legacy timestamped data (without headers) + // and add empty headers to existing data + final StreamsBuilder streamsBuilder = new StreamsBuilder(); + + streamsBuilder + .addStateStore( + Stores.sessionStoreWithHeadersBuilder( + Stores.persistentSessionStoreWithHeaders(STORE_NAME, Duration.ofMillis(RETENTION_MS)), + Serdes.Integer(), + Serdes.String() + ) + ) + .stream(inputStream, Consumed.with(Serdes.Integer(), Serdes.String())) + .process(() -> new SessionStoreWithHeadersContentCheckerProcessor(true), STORE_NAME) + .to(outputStream, Produced.with(Serdes.Integer(), Serdes.Integer())); + + kafkaStreams = new KafkaStreams(streamsBuilder.build(), props); + kafkaStreams.start(); + + // produce additional records with headers to verify upgraded store works + final Headers upgradedHeaders = new RecordHeaders().add("upgraded", "true".getBytes()); + final int additionalRecordsProduced = produceDataToTopicWithHeaders(inputStream, baseTimestamp + 2 * WINDOW_GAP_MS, upgradedHeaders, + KeyValue.pair(1, "a200"), KeyValue.pair(2, "b200"), KeyValue.pair(3, "c200")); + + receivedRecords = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( + TestUtils.consumerConfig( + CLUSTER.bootstrapServers(), + IntegerDeserializer.class, + IntegerDeserializer.class), + outputStream, + initialRecordsProduced + additionalRecordsProduced); + + receivedRecords.forEach(receivedRecord -> assertEquals(0, receivedRecord.value)); + } + + private Properties props() { + final String safeTestName = safeUniqueTestName(testInfo); + final Properties streamsConfiguration = new Properties(); + streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName); + streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); + streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); + streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + return streamsConfiguration; + } + + /** + * @return number of records produced + */ + @SuppressWarnings("varargs") + @SafeVarargs + private int produceDataToTopic(final String topic, + final long timestamp, + final KeyValue... keyValues) { + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( + topic, + Arrays.asList(keyValues), + TestUtils.producerConfig(CLUSTER.bootstrapServers(), + IntegerSerializer.class, + StringSerializer.class), + timestamp); + return keyValues.length; + } + + /** + * Produce records with headers. + * + * @return number of records produced + */ + @SuppressWarnings("varargs") + @SafeVarargs + private int produceDataToTopicWithHeaders(final String topic, + final long timestamp, + final Headers headers, + final KeyValue... keyValues) { + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( + topic, + Arrays.asList(keyValues), + TestUtils.producerConfig(CLUSTER.bootstrapServers(), + IntegerSerializer.class, + StringSerializer.class), + headers, + timestamp, + false); + return keyValues.length; + } + + /** + * Processor for validating expected contents of a session store with headers, and forwards + * the number of failed checks downstream for consumption. + */ + private static class SessionStoreWithHeadersContentCheckerProcessor implements Processor { + + private ProcessorContext context; + private SessionStoreWithHeaders store; + + // whether the processor should write records to the store as they arrive. + private final boolean writeToStore; + // in-memory copy of seen data, to validate for testing purposes. + // Maps windowed-key -> AggregationWithHeaders + private final Map, Optional>> data; + + SessionStoreWithHeadersContentCheckerProcessor(final boolean writeToStore) { + this.writeToStore = writeToStore; + this.data = new HashMap<>(); + } + + @Override + public void init(final ProcessorContext context) { + this.context = context; + store = context.getStateStore(STORE_NAME); + } + + @Override + public void process(final Record record) { + if (writeToStore) { + final AggregationWithHeaders valueHeaders = + AggregationWithHeaders.make(record.value(), record.headers()); + store.put(new Windowed<>(record.key(), new SessionWindow(record.timestamp(), record.timestamp())), valueHeaders); + + data.put(new Windowed<>(record.key(), new SessionWindow(record.timestamp(), record.timestamp())), Optional.ofNullable(valueHeaders)); + } + + // + final int failedChecks = checkStoreContents(); + context.forward(record.withValue(failedChecks)); + } + + /** + * Check expected contents of store, and signal completion by writing number of failures to downstream + * @return number of failed checks + */ + private int checkStoreContents() { + int failedChecks = 0; + for (final Map.Entry, Optional>> keyEntry : data.entrySet()) { + final Windowed windowedKey = keyEntry.getKey(); + final long sessionTime = windowedKey.window().start(); + + final AggregationWithHeaders expectedValueTHeaders = keyEntry.getValue().orElse(null); + final AggregationWithHeaders actualValueHeaders = store.fetchSession(windowedKey.key(), sessionTime, sessionTime); + + if (!Objects.equals(actualValueHeaders, expectedValueTHeaders)) { + failedChecks++; + } + } + + return failedChecks; + } + } + + /** + * Processor for validating expected contents of a session store (without headers). + * Used for testing the upgrade path from SessionStore to SessionStoreWithHeaders. + */ + private static class SessionStoreContentCheckerProcessor implements Processor { + + private ProcessorContext context; + private SessionStore store; + + // in-memory copy of seen data, to validate for testing purposes. + // Maps windowed-key -> value + private final Map, Optional> data; + + SessionStoreContentCheckerProcessor() { + this.data = new HashMap<>(); + } + + @Override + public void init(final ProcessorContext context) { + this.context = context; + store = context.getStateStore(STORE_NAME); + } + + @Override + public void process(final Record record) { + + store.put(new Windowed<>(record.key(), new SessionWindow(record.timestamp(), record.timestamp())), record.value()); + + data.put(new Windowed<>(record.key(), new SessionWindow(record.timestamp(), record.timestamp())), Optional.ofNullable(record.value())); + + final int failedChecks = checkStoreContents(); + context.forward(record.withValue(failedChecks)); + } + + /** + * Check expected contents of store, and signal completion by writing + * @return number of failed checks + */ + private int checkStoreContents() { + int failedChecks = 0; + for (final Map.Entry, Optional> keyEntry : data.entrySet()) { + final Windowed windowedKey = keyEntry.getKey(); + final long sessionTime = windowedKey.window().start(); + + final String expectedValue = keyEntry.getValue().orElse(null); + final String actualValue = store.fetchSession(windowedKey.key(), sessionTime, sessionTime); + + if (!Objects.equals(actualValue, expectedValue)) { + failedChecks++; + } + } + return failedChecks; + } + } +} diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimestampedWindowStoreWithHeadersTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimestampedWindowStoreWithHeadersTest.java index 5ca2fb29e2744..012bc773df6c5 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimestampedWindowStoreWithHeadersTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimestampedWindowStoreWithHeadersTest.java @@ -102,7 +102,7 @@ public static void after() { } @BeforeEach - public void beforeTest(final TestInfo testInfo) throws InterruptedException { + public void beforeTest(final TestInfo testInfo) { this.testInfo = testInfo; final String uniqueTestName = safeUniqueTestName(testInfo); inputStream = "input-stream-" + uniqueTestName; @@ -375,9 +375,9 @@ private Properties props() { */ @SuppressWarnings("varargs") @SafeVarargs - private final int produceDataToTopic(final String topic, - final long timestamp, - final KeyValue... keyValues) { + private int produceDataToTopic(final String topic, + final long timestamp, + final KeyValue... keyValues) { IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( topic, Arrays.asList(keyValues), From ea8324817a936d94051f865127fc807f20cb892f Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Fri, 3 Apr 2026 14:33:25 -0700 Subject: [PATCH 6/6] fix method name for header-kv-store builder --- .../HeadersStoreUpgradeIntegrationTest.java | 10 +- ...mestampedKeyValueStoreWithHeadersTest.java | 10 +- .../internals/KeyValueStoreMaterializer.java | 2 +- .../internals/SubscriptionStoreFactory.java | 2 +- .../apache/kafka/streams/state/Stores.java | 179 +++++++++++------- ...oreignTableJoinProcessorSupplierTests.java | 2 +- ...scriptionReceiveProcessorSupplierTest.java | 2 +- .../QueryableStoreTypesWithHeadersTest.java | 6 +- .../GlobalStateStoreProviderTest.java | 2 +- .../StreamThreadStateStoreProviderTest.java | 2 +- ...reWithHeadersSerializerSideEffectTest.java | 2 +- 11 files changed, 128 insertions(+), 91 deletions(-) diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java index 3d95a1683739a..5ba3f95f47d39 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java @@ -169,7 +169,7 @@ private void shouldMigrateTimestampedKeyValueStoreToTimestampedKeyValueStoreWith final StreamsBuilder streamsBuilderForNewStore = new StreamsBuilder(); streamsBuilderForNewStore.addStateStore( - Stores.timestampedKeyValueStoreBuilderWithHeaders( + Stores.timestampedKeyValueStoreWithHeadersBuilder( persistentStore ? Stores.persistentTimestampedKeyValueStoreWithHeaders(STORE_NAME) : Stores.inMemoryKeyValueStore(STORE_NAME), Serdes.String(), Serdes.String())) @@ -222,7 +222,7 @@ public void shouldProxyTimestampedKeyValueStoreToTimestampedKeyValueStoreWithHea final StreamsBuilder streamsBuilderForNewStore = new StreamsBuilder(); streamsBuilderForNewStore.addStateStore( - Stores.timestampedKeyValueStoreBuilderWithHeaders( + Stores.timestampedKeyValueStoreWithHeadersBuilder( Stores.persistentTimestampedKeyValueStore(STORE_NAME), Serdes.String(), Serdes.String())) @@ -288,7 +288,7 @@ private void shouldMigratePlainKeyValueStoreToTimestampedKeyValueStoreWithHeader final StreamsBuilder streamsBuilderForNewStore = new StreamsBuilder(); streamsBuilderForNewStore.addStateStore( - Stores.timestampedKeyValueStoreBuilderWithHeaders( + Stores.timestampedKeyValueStoreWithHeadersBuilder( persistentStore ? Stores.persistentTimestampedKeyValueStoreWithHeaders(STORE_NAME) : Stores.inMemoryKeyValueStore(STORE_NAME), Serdes.String(), Serdes.String())) @@ -341,7 +341,7 @@ public void shouldProxyPlainKeyValueStoreToTimestampedKeyValueStoreWithHeadersUs final StreamsBuilder streamsBuilderForNewStore = new StreamsBuilder(); streamsBuilderForNewStore.addStateStore( - Stores.timestampedKeyValueStoreBuilderWithHeaders( + Stores.timestampedKeyValueStoreWithHeadersBuilder( Stores.persistentKeyValueStore(STORE_NAME), Serdes.String(), Serdes.String())) @@ -1563,7 +1563,7 @@ private void produceRecordWithHeaders(final String key, final String value, fina private void setupAndPopulateKeyValueStoreWithHeaders(final Properties props) throws Exception { final StreamsBuilder headersBuilder = new StreamsBuilder(); headersBuilder.addStateStore( - Stores.timestampedKeyValueStoreBuilderWithHeaders( + Stores.timestampedKeyValueStoreWithHeadersBuilder( Stores.persistentTimestampedKeyValueStoreWithHeaders(STORE_NAME), Serdes.String(), Serdes.String())) diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimestampedKeyValueStoreWithHeadersTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimestampedKeyValueStoreWithHeadersTest.java index b7cbdb2d82d85..bb110d86cdd2b 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimestampedKeyValueStoreWithHeadersTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimestampedKeyValueStoreWithHeadersTest.java @@ -125,7 +125,7 @@ public void shouldPutGetAndDelete() throws Exception { streamsBuilder .addStateStore( - Stores.timestampedKeyValueStoreBuilderWithHeaders( + Stores.timestampedKeyValueStoreWithHeadersBuilder( Stores.persistentTimestampedKeyValueStoreWithHeaders(STORE_NAME), Serdes.Integer(), Serdes.String() @@ -173,7 +173,7 @@ public void shouldSetChangelogTopicProperties() throws Exception { streamsBuilder .addStateStore( - Stores.timestampedKeyValueStoreBuilderWithHeaders( + Stores.timestampedKeyValueStoreWithHeadersBuilder( Stores.persistentTimestampedKeyValueStoreWithHeaders(STORE_NAME), Serdes.Integer(), Serdes.String() @@ -210,7 +210,7 @@ public void shouldRestore() throws Exception { streamsBuilder .addStateStore( - Stores.timestampedKeyValueStoreBuilderWithHeaders( + Stores.timestampedKeyValueStoreWithHeadersBuilder( Stores.persistentTimestampedKeyValueStoreWithHeaders(STORE_NAME), Serdes.Integer(), Serdes.String() @@ -264,7 +264,7 @@ public void shouldRestore() throws Exception { streamsBuilder .addStateStore( - Stores.timestampedKeyValueStoreBuilderWithHeaders( + Stores.timestampedKeyValueStoreWithHeadersBuilder( Stores.persistentTimestampedKeyValueStoreWithHeaders(STORE_NAME), Serdes.Integer(), Serdes.String() @@ -356,7 +356,7 @@ private void shouldManualUpgradeFromTimestampedToHeaders(final Topology original streamsBuilder .addStateStore( - Stores.timestampedKeyValueStoreBuilderWithHeaders( + Stores.timestampedKeyValueStoreWithHeadersBuilder( Stores.persistentTimestampedKeyValueStoreWithHeaders(STORE_NAME), Serdes.Integer(), Serdes.String() diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java index 30b3b885c1018..3e23c409df01f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java @@ -57,7 +57,7 @@ public StoreBuilder builder() { materialized.keySerde(), materialized.valueSerde()); } else if (supplier instanceof HeadersBytesStoreSupplier) { - builder = Stores.timestampedKeyValueStoreBuilderWithHeaders( + builder = Stores.timestampedKeyValueStoreWithHeadersBuilder( supplier, materialized.keySerde(), materialized.valueSerde()); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SubscriptionStoreFactory.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SubscriptionStoreFactory.java index 2aed2f74d341c..94b59fd0c117c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SubscriptionStoreFactory.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SubscriptionStoreFactory.java @@ -47,7 +47,7 @@ public SubscriptionStoreFactory( @Override public StoreBuilder builder() { StoreBuilder builder; - builder = Stores.timestampedKeyValueStoreBuilderWithHeaders( + builder = Stores.timestampedKeyValueStoreWithHeadersBuilder( dslStoreSuppliers().keyValueStore(new DslKeyValueParams(name, dslStoreFormat())), new Serdes.BytesSerde(), subscriptionWrapperSerde diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java index 4dcff31361232..d567d525dc39a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java @@ -125,7 +125,7 @@ public static KeyValueBytesStoreSupplier persistentTimestampedKeyValueStore(fina * Create a persistent {@link KeyValueBytesStoreSupplier} that stores headers along with timestamps. *

* This store supplier can be passed into a - * {@link #timestampedKeyValueStoreBuilderWithHeaders(KeyValueBytesStoreSupplier, Serde, Serde)} + * {@link #timestampedKeyValueStoreWithHeadersBuilder(KeyValueBytesStoreSupplier, Serde, Serde)} * to build a {@link TimestampedKeyValueStoreWithHeaders}. *

* The store will persist key-value pairs along with record timestamps and headers, @@ -168,8 +168,10 @@ public static KeyValueBytesStoreSupplier persistentTimestampedKeyValueStoreWithH * @return an instance of {@link VersionedBytesStoreSupplier} * @throws IllegalArgumentException if {@code historyRetention} can't be represented as {@code long milliseconds} */ - public static VersionedBytesStoreSupplier persistentVersionedKeyValueStore(final String name, - final Duration historyRetention) { + public static VersionedBytesStoreSupplier persistentVersionedKeyValueStore( + final String name, + final Duration historyRetention + ) { Objects.requireNonNull(name, "name cannot be null"); final String hrMsgPrefix = prepareMillisCheckFailMsgPrefix(historyRetention, "historyRetention"); final long historyRetentionMs = validateMillisecondDuration(historyRetention, hrMsgPrefix); @@ -206,9 +208,11 @@ public static VersionedBytesStoreSupplier persistentVersionedKeyValueStore(final * @return an instance of {@link VersionedBytesStoreSupplier} * @throws IllegalArgumentException if {@code historyRetention} or {@code segmentInterval} can't be represented as {@code long milliseconds} */ - public static VersionedBytesStoreSupplier persistentVersionedKeyValueStore(final String name, - final Duration historyRetention, - final Duration segmentInterval) { + public static VersionedBytesStoreSupplier persistentVersionedKeyValueStore( + final String name, + final Duration historyRetention, + final Duration segmentInterval + ) { Objects.requireNonNull(name, "name cannot be null"); final String hrMsgPrefix = prepareMillisCheckFailMsgPrefix(historyRetention, "historyRetention"); final long historyRetentionMs = validateMillisecondDuration(historyRetention, hrMsgPrefix); @@ -297,10 +301,12 @@ public String metricsScope() { * @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds} * @throws IllegalArgumentException if {@code retentionPeriod} is smaller than {@code windowSize} */ - public static WindowBytesStoreSupplier persistentWindowStore(final String name, - final Duration retentionPeriod, - final Duration windowSize, - final boolean retainDuplicates) throws IllegalArgumentException { + public static WindowBytesStoreSupplier persistentWindowStore( + final String name, + final Duration retentionPeriod, + final Duration windowSize, + final boolean retainDuplicates + ) throws IllegalArgumentException { return persistentWindowStore(name, retentionPeriod, windowSize, retainDuplicates, RocksDbWindowBytesStoreSupplier.WindowStoreTypes.DEFAULT_WINDOW_STORE); } @@ -329,10 +335,12 @@ public static WindowBytesStoreSupplier persistentWindowStore(final String name, * @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds} * @throws IllegalArgumentException if {@code retentionPeriod} is smaller than {@code windowSize} */ - public static WindowBytesStoreSupplier persistentTimestampedWindowStore(final String name, - final Duration retentionPeriod, - final Duration windowSize, - final boolean retainDuplicates) throws IllegalArgumentException { + public static WindowBytesStoreSupplier persistentTimestampedWindowStore( + final String name, + final Duration retentionPeriod, + final Duration windowSize, + final boolean retainDuplicates + ) throws IllegalArgumentException { return persistentWindowStore(name, retentionPeriod, windowSize, retainDuplicates, RocksDbWindowBytesStoreSupplier.WindowStoreTypes.TIMESTAMPED_WINDOW_STORE); } @@ -346,18 +354,22 @@ public static WindowBytesStoreSupplier persistentTimestampedWindowStore(final St * @return an instance of {@link WindowBytesStoreSupplier} * @throws IllegalArgumentException if {@code retentionPeriod} is smaller than {@code windowSize} */ - public static WindowBytesStoreSupplier persistentTimestampedWindowStoreWithHeaders(final String name, - final Duration retentionPeriod, - final Duration windowSize, - final boolean retainDuplicates) throws IllegalArgumentException { + public static WindowBytesStoreSupplier persistentTimestampedWindowStoreWithHeaders( + final String name, + final Duration retentionPeriod, + final Duration windowSize, + final boolean retainDuplicates + ) throws IllegalArgumentException { return persistentWindowStore(name, retentionPeriod, windowSize, retainDuplicates, RocksDbWindowBytesStoreSupplier.WindowStoreTypes.TIMESTAMPED_WINDOW_STORE_WITH_HEADERS); } - private static WindowBytesStoreSupplier persistentWindowStore(final String name, - final Duration retentionPeriod, - final Duration windowSize, - final boolean retainDuplicates, - final RocksDbWindowBytesStoreSupplier.WindowStoreTypes storeType) { + private static WindowBytesStoreSupplier persistentWindowStore( + final String name, + final Duration retentionPeriod, + final Duration windowSize, + final boolean retainDuplicates, + final RocksDbWindowBytesStoreSupplier.WindowStoreTypes storeType + ) { Objects.requireNonNull(name, "name cannot be null"); final String rpMsgPrefix = prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod"); final long retentionMs = validateMillisecondDuration(retentionPeriod, rpMsgPrefix); @@ -406,10 +418,12 @@ private static WindowBytesStoreSupplier persistentWindowStore(final String name, * @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds} * @throws IllegalArgumentException if {@code retentionPeriod} is smaller than {@code windowSize} */ - public static WindowBytesStoreSupplier inMemoryWindowStore(final String name, - final Duration retentionPeriod, - final Duration windowSize, - final boolean retainDuplicates) throws IllegalArgumentException { + public static WindowBytesStoreSupplier inMemoryWindowStore( + final String name, + final Duration retentionPeriod, + final Duration windowSize, + final boolean retainDuplicates + ) throws IllegalArgumentException { Objects.requireNonNull(name, "name cannot be null"); final String repartitionPeriodErrorMessagePrefix = prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod"); @@ -447,8 +461,10 @@ public static WindowBytesStoreSupplier inMemoryWindowStore(final String name, * contain the inactivity gap of the session and the entire grace period.) * @return an instance of a {@link SessionBytesStoreSupplier} */ - public static SessionBytesStoreSupplier persistentSessionStore(final String name, - final Duration retentionPeriod) { + public static SessionBytesStoreSupplier persistentSessionStore( + final String name, + final Duration retentionPeriod + ) { return persistentSessionStore(name, retentionPeriod, false); } @@ -466,14 +482,18 @@ public static SessionBytesStoreSupplier persistentSessionStore(final String name * contain the inactivity gap of the session and the entire grace period.) * @return an instance of a {@link SessionBytesStoreSupplier} */ - public static SessionBytesStoreSupplier persistentSessionStoreWithHeaders(final String name, - final Duration retentionPeriod) { + public static SessionBytesStoreSupplier persistentSessionStoreWithHeaders( + final String name, + final Duration retentionPeriod + ) { return persistentSessionStore(name, retentionPeriod, true); } - private static SessionBytesStoreSupplier persistentSessionStore(final String name, - final Duration retentionPeriod, - final boolean withHeaders) { + private static SessionBytesStoreSupplier persistentSessionStore( + final String name, + final Duration retentionPeriod, + final boolean withHeaders + ) { Objects.requireNonNull(name, "name cannot be null"); final String msgPrefix = prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod"); final long retentionPeriodMs = validateMillisecondDuration(retentionPeriod, msgPrefix); @@ -521,9 +541,11 @@ public static SessionBytesStoreSupplier inMemorySessionStore(final String name, * @param value type * @return an instance of a {@link StoreBuilder} that can build a {@link KeyValueStore} */ - public static StoreBuilder> keyValueStoreBuilder(final KeyValueBytesStoreSupplier supplier, - final Serde keySerde, - final Serde valueSerde) { + public static StoreBuilder> keyValueStoreBuilder( + final KeyValueBytesStoreSupplier supplier, + final Serde keySerde, + final Serde valueSerde + ) { Objects.requireNonNull(supplier, "supplier cannot be null"); return new KeyValueStoreBuilder<>(supplier, keySerde, valueSerde, Time.SYSTEM); } @@ -543,13 +565,35 @@ public static StoreBuilder> keyValueStoreBuilder(fina * @param value type * @return an instance of a {@link StoreBuilder} that can build a {@link KeyValueStore} */ - public static StoreBuilder> timestampedKeyValueStoreBuilder(final KeyValueBytesStoreSupplier supplier, - final Serde keySerde, - final Serde valueSerde) { + public static StoreBuilder> timestampedKeyValueStoreBuilder( + final KeyValueBytesStoreSupplier supplier, + final Serde keySerde, + final Serde valueSerde + ) { Objects.requireNonNull(supplier, "supplier cannot be null"); return new TimestampedKeyValueStoreBuilder<>(supplier, keySerde, valueSerde, Time.SYSTEM); } + /** + * Creates a {@link StoreBuilder} that can be used to build a {@link TimestampedKeyValueStoreWithHeaders}. + * + * @param supplier a {@link KeyValueBytesStoreSupplier} (cannot be {@code null}) + * @param keySerde the key serde to use + * @param valueSerde the value serde to use; if the serialized bytes is {@code null} for put operations, + * it is treated as delete + * @param key type + * @param value type + * @return an instance of {@link StoreBuilder} than can build a {@link KeyValueStore} + */ + public static StoreBuilder> timestampedKeyValueStoreWithHeadersBuilder( + final KeyValueBytesStoreSupplier supplier, + final Serde keySerde, + final Serde valueSerde + ) { + Objects.requireNonNull(supplier, "supplier cannot be null"); + return new TimestampedKeyValueStoreBuilderWithHeaders<>(supplier, keySerde, valueSerde, Time.SYSTEM); + } + /** * Creates a {@link StoreBuilder} that can be used to build a {@link VersionedKeyValueStore}. * @@ -561,9 +605,11 @@ public static StoreBuilder> timestampedKey * @param value type * @return an instance of a {@link StoreBuilder} that can build a {@link VersionedKeyValueStore} */ - public static StoreBuilder> versionedKeyValueStoreBuilder(final VersionedBytesStoreSupplier supplier, - final Serde keySerde, - final Serde valueSerde) { + public static StoreBuilder> versionedKeyValueStoreBuilder( + final VersionedBytesStoreSupplier supplier, + final Serde keySerde, + final Serde valueSerde + ) { Objects.requireNonNull(supplier, "supplier cannot be null"); return new VersionedKeyValueStoreBuilder<>(supplier, keySerde, valueSerde, Time.SYSTEM); } @@ -582,9 +628,11 @@ public static StoreBuilder> versionedKeyValu * @param value type * @return an instance of {@link StoreBuilder} than can build a {@link WindowStore} */ - public static StoreBuilder> windowStoreBuilder(final WindowBytesStoreSupplier supplier, - final Serde keySerde, - final Serde valueSerde) { + public static StoreBuilder> windowStoreBuilder( + final WindowBytesStoreSupplier supplier, + final Serde keySerde, + final Serde valueSerde + ) { Objects.requireNonNull(supplier, "supplier cannot be null"); return new WindowStoreBuilder<>(supplier, keySerde, valueSerde, Time.SYSTEM); } @@ -604,9 +652,11 @@ public static StoreBuilder> windowStoreBuilder(final Wi * @param value type * @return an instance of {@link StoreBuilder} that can build a {@link TimestampedWindowStore} */ - public static StoreBuilder> timestampedWindowStoreBuilder(final WindowBytesStoreSupplier supplier, - final Serde keySerde, - final Serde valueSerde) { + public static StoreBuilder> timestampedWindowStoreBuilder( + final WindowBytesStoreSupplier supplier, + final Serde keySerde, + final Serde valueSerde + ) { Objects.requireNonNull(supplier, "supplier cannot be null"); return new TimestampedWindowStoreBuilder<>(supplier, keySerde, valueSerde, Time.SYSTEM); } @@ -624,7 +674,8 @@ public static StoreBuilder> timestampedWindo public static StoreBuilder> timestampedWindowStoreWithHeadersBuilder( final WindowBytesStoreSupplier supplier, final Serde keySerde, - final Serde valueSerde) { + final Serde valueSerde + ) { Objects.requireNonNull(supplier, "supplier cannot be null"); return new TimestampedWindowStoreWithHeadersBuilder<>(supplier, keySerde, valueSerde, Time.SYSTEM); } @@ -640,9 +691,11 @@ public static StoreBuilder> times * @param value type * @return an instance of {@link StoreBuilder} than can build a {@link SessionStore} */ - public static StoreBuilder> sessionStoreBuilder(final SessionBytesStoreSupplier supplier, - final Serde keySerde, - final Serde valueSerde) { + public static StoreBuilder> sessionStoreBuilder( + final SessionBytesStoreSupplier supplier, + final Serde keySerde, + final Serde valueSerde + ) { Objects.requireNonNull(supplier, "supplier cannot be null"); return new SessionStoreBuilder<>(supplier, keySerde, valueSerde, Time.SYSTEM); } @@ -661,26 +714,10 @@ public static StoreBuilder> sessionStoreBuilder(final public static StoreBuilder> sessionStoreWithHeadersBuilder( final SessionBytesStoreSupplier supplier, final Serde keySerde, - final Serde valueSerde) { + final Serde valueSerde + ) { Objects.requireNonNull(supplier, "supplier cannot be null"); return new SessionStoreWithHeadersBuilder<>(supplier, keySerde, valueSerde, Time.SYSTEM); } - /** - * Creates a {@link StoreBuilder} that can be used to build a {@link TimestampedKeyValueStoreWithHeaders}. - * - * @param supplier a {@link KeyValueBytesStoreSupplier} (cannot be {@code null}) - * @param keySerde the key serde to use - * @param valueSerde the value serde to use; if the serialized bytes is {@code null} for put operations, - * it is treated as delete - * @param key type - * @param value type - * @return an instance of {@link StoreBuilder} than can build a {@link KeyValueStore} - */ - public static StoreBuilder> timestampedKeyValueStoreBuilderWithHeaders(final KeyValueBytesStoreSupplier supplier, - final Serde keySerde, - final Serde valueSerde) { - Objects.requireNonNull(supplier, "supplier cannot be null"); - return new TimestampedKeyValueStoreBuilderWithHeaders<>(supplier, keySerde, valueSerde, Time.SYSTEM); - } } \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplierTests.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplierTests.java index 2d53b83a94fd8..1426c040da7c4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplierTests.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplierTests.java @@ -210,7 +210,7 @@ private void putInStore(final String fk, final String pk) { private StoreBuilder>> storeBuilder() { final Serde> subscriptionWrapperSerde = new SubscriptionWrapperSerde<>( PK_SERDE_TOPIC_SUPPLIER, Serdes.String()); - return Stores.timestampedKeyValueStoreBuilderWithHeaders( + return Stores.timestampedKeyValueStoreWithHeadersBuilder( Stores.persistentTimestampedKeyValueStore( "Store" ), diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplierTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplierTest.java index ddfcadfc26144..1cfd33b6d1e09 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplierTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplierTest.java @@ -517,7 +517,7 @@ private SubscriptionReceiveProcessorSupplier supplier( private StoreBuilder>> storeBuilder() { final Serde> subscriptionWrapperSerde = new SubscriptionWrapperSerde<>( PK_SERDE_TOPIC_SUPPLIER, Serdes.String()); - return Stores.timestampedKeyValueStoreBuilderWithHeaders( + return Stores.timestampedKeyValueStoreWithHeadersBuilder( Stores.persistentTimestampedKeyValueStoreWithHeaders( "Store" ), diff --git a/streams/src/test/java/org/apache/kafka/streams/state/QueryableStoreTypesWithHeadersTest.java b/streams/src/test/java/org/apache/kafka/streams/state/QueryableStoreTypesWithHeadersTest.java index eafe2c5a39c5a..e59050d912fca 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/QueryableStoreTypesWithHeadersTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/QueryableStoreTypesWithHeadersTest.java @@ -30,7 +30,7 @@ public class QueryableStoreTypesWithHeadersTest { @Test public void shouldAcceptTimestampedKeyValueStoreWithHeadersForTimestampedKeyValueStoreType() { final TimestampedKeyValueStoreWithHeaders store = - Stores.timestampedKeyValueStoreBuilderWithHeaders( + Stores.timestampedKeyValueStoreWithHeadersBuilder( Stores.inMemoryKeyValueStore("test-store"), Serdes.String(), Serdes.String()) @@ -45,7 +45,7 @@ public void shouldAcceptTimestampedKeyValueStoreWithHeadersForTimestampedKeyValu @Test public void shouldAcceptTimestampedKeyValueStoreWithHeadersForKeyValueStoreType() { final TimestampedKeyValueStoreWithHeaders store = - Stores.timestampedKeyValueStoreBuilderWithHeaders( + Stores.timestampedKeyValueStoreWithHeadersBuilder( Stores.inMemoryKeyValueStore("test-store"), Serdes.String(), Serdes.String()) @@ -60,7 +60,7 @@ public void shouldAcceptTimestampedKeyValueStoreWithHeadersForKeyValueStoreType( @Test public void shouldNotAcceptTimestampedKeyValueStoreWithHeadersForWindowStoreType() { final TimestampedKeyValueStoreWithHeaders store = - Stores.timestampedKeyValueStoreBuilderWithHeaders( + Stores.timestampedKeyValueStoreWithHeadersBuilder( Stores.inMemoryKeyValueStore("test-store"), Serdes.String(), Serdes.String()) diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java index cd8692137dd98..f96f8e96d9983 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java @@ -111,7 +111,7 @@ public void before() { Serdes.String()).build()); stores.put( "ts-kv-store-with-headers", - Stores.timestampedKeyValueStoreBuilderWithHeaders( + Stores.timestampedKeyValueStoreWithHeadersBuilder( Stores.inMemoryKeyValueStore("ts-kv-store-with-headers"), Serdes.String(), Serdes.String()).build()); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java index 180cd121283aa..0bd448159973a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java @@ -155,7 +155,7 @@ public void before() { Serdes.String()), "the-processor"); topology.addStateStore( - Stores.timestampedKeyValueStoreBuilderWithHeaders( + Stores.timestampedKeyValueStoreWithHeadersBuilder( Stores.inMemoryKeyValueStore("timestamped-kv-store-with-headers"), Serdes.String(), Serdes.String()), diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreWithHeadersSerializerSideEffectTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreWithHeadersSerializerSideEffectTest.java index 918e3e37d7a4e..2fe5671fea0d7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreWithHeadersSerializerSideEffectTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreWithHeadersSerializerSideEffectTest.java @@ -132,7 +132,7 @@ public void shouldPropagateSerializerHeaderSideEffectToChangelog() { // Create a timestamped key-value store with headers using our custom serializer builder.addStateStore( - Stores.timestampedKeyValueStoreBuilderWithHeaders( + Stores.timestampedKeyValueStoreWithHeadersBuilder( Stores.inMemoryKeyValueStore(STORE_NAME), new HeaderAddingSerde(), // Custom key serializer that adds headers Serdes.String()