Skip to content

Commit 900aa31

Browse files
committed
fix incorrect public method name
1 parent d3678b9 commit 900aa31

8 files changed

Lines changed: 16 additions & 16 deletions

File tree

streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1622,7 +1622,7 @@ private void shouldMigrateSessionStoreToSessionStoreWithHeaders(final boolean is
16221622
final StreamsBuilder newBuilder = new StreamsBuilder();
16231623
final AtomicReference<SessionWithHeadersProcessor> processorRef = new AtomicReference<>();
16241624
newBuilder.addStateStore(
1625-
Stores.sessionStoreBuilderWithHeaders(
1625+
Stores.sessionStoreWithHeadersBuilder(
16261626
isPersistent ? Stores.persistentSessionStoreWithHeaders(SESSION_STORE_NAME, Duration.ofMillis(RETENTION_MS)) :
16271627
Stores.inMemorySessionStore(SESSION_STORE_NAME, Duration.ofMillis(RETENTION_MS)),
16281628
Serdes.String(),
@@ -1680,7 +1680,7 @@ public void shouldProxySessionStoreToSessionStoreWithHeaders() throws Exception
16801680
final StreamsBuilder newBuilder = new StreamsBuilder();
16811681
final AtomicReference<SessionWithHeadersProcessor> processorRef = new AtomicReference<>();
16821682
newBuilder.addStateStore(
1683-
Stores.sessionStoreBuilderWithHeaders(
1683+
Stores.sessionStoreWithHeadersBuilder(
16841684
Stores.persistentSessionStore(SESSION_STORE_NAME, Duration.ofMillis(RETENTION_MS)), // non-headers supplier!
16851685
Serdes.String(),
16861686
Serdes.String()))
@@ -1941,7 +1941,7 @@ private void setupAndPopulateSessionStoreWithHeaders(final Properties props) thr
19411941
final StreamsBuilder headersBuilder = new StreamsBuilder();
19421942
final AtomicReference<SessionWithHeadersProcessor> processorRef = new AtomicReference<>();
19431943
headersBuilder.addStateStore(
1944-
Stores.sessionStoreBuilderWithHeaders(
1944+
Stores.sessionStoreWithHeadersBuilder(
19451945
Stores.persistentSessionStoreWithHeaders(SESSION_STORE_NAME, Duration.ofMillis(RETENTION_MS)),
19461946
Serdes.String(),
19471947
Serdes.String()))

streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionStoreMaterializer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public StoreBuilder<?> builder() {
7070

7171
final StoreBuilder<?> builder;
7272
if (supplier instanceof HeadersBytesStoreSupplier) {
73-
builder = Stores.sessionStoreBuilderWithHeaders(
73+
builder = Stores.sessionStoreWithHeadersBuilder(
7474
supplier,
7575
materialized.keySerde(),
7676
materialized.valueSerde()

streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreBuilderWrapper.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import org.apache.kafka.streams.StreamsConfig;
2020
import org.apache.kafka.streams.state.StoreBuilder;
2121
import org.apache.kafka.streams.state.internals.SessionStoreBuilder;
22-
import org.apache.kafka.streams.state.internals.SessionStoreBuilderWithHeaders;
22+
import org.apache.kafka.streams.state.internals.SessionStoreWithHeadersBuilder;
2323
import org.apache.kafka.streams.state.internals.TimestampedWindowStoreBuilder;
2424
import org.apache.kafka.streams.state.internals.TimestampedWindowStoreWithHeadersBuilder;
2525
import org.apache.kafka.streams.state.internals.VersionedKeyValueStoreBuilder;
@@ -75,8 +75,8 @@ public long retentionPeriod() {
7575
return ((TimestampedWindowStoreWithHeadersBuilder<?, ?>) builder).retentionPeriod();
7676
} else if (builder instanceof SessionStoreBuilder) {
7777
return ((SessionStoreBuilder<?, ?>) builder).retentionPeriod();
78-
} else if (builder instanceof SessionStoreBuilderWithHeaders) {
79-
return ((SessionStoreBuilderWithHeaders<?, ?>) builder).retentionPeriod();
78+
} else if (builder instanceof SessionStoreWithHeadersBuilder) {
79+
return ((SessionStoreWithHeadersBuilder<?, ?>) builder).retentionPeriod();
8080
} else {
8181
throw new IllegalStateException("retentionPeriod is not supported when not a window store");
8282
}
@@ -112,7 +112,7 @@ public boolean isWindowStore() {
112112
|| builder instanceof TimestampedWindowStoreBuilder
113113
|| builder instanceof TimestampedWindowStoreWithHeadersBuilder
114114
|| builder instanceof SessionStoreBuilder
115-
|| builder instanceof SessionStoreBuilderWithHeaders;
115+
|| builder instanceof SessionStoreWithHeadersBuilder;
116116
}
117117

118118
@Override

streams/src/main/java/org/apache/kafka/streams/state/Stores.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
import org.apache.kafka.streams.state.internals.RocksDbVersionedKeyValueBytesStoreSupplier;
3232
import org.apache.kafka.streams.state.internals.RocksDbWindowBytesStoreSupplier;
3333
import org.apache.kafka.streams.state.internals.SessionStoreBuilder;
34-
import org.apache.kafka.streams.state.internals.SessionStoreBuilderWithHeaders;
34+
import org.apache.kafka.streams.state.internals.SessionStoreWithHeadersBuilder;
3535
import org.apache.kafka.streams.state.internals.TimestampedKeyValueStoreBuilder;
3636
import org.apache.kafka.streams.state.internals.TimestampedKeyValueStoreBuilderWithHeaders;
3737
import org.apache.kafka.streams.state.internals.TimestampedWindowStoreBuilder;
@@ -658,12 +658,12 @@ public static <K, V> StoreBuilder<SessionStore<K, V>> sessionStoreBuilder(final
658658
* @param <V> value type
659659
* @return an instance of {@link StoreBuilder} than can build a {@link SessionStoreWithHeaders}
660660
*/
661-
public static <K, V> StoreBuilder<SessionStoreWithHeaders<K, V>> sessionStoreBuilderWithHeaders(
661+
public static <K, V> StoreBuilder<SessionStoreWithHeaders<K, V>> sessionStoreWithHeadersBuilder(
662662
final SessionBytesStoreSupplier supplier,
663663
final Serde<K> keySerde,
664664
final Serde<V> valueSerde) {
665665
Objects.requireNonNull(supplier, "supplier cannot be null");
666-
return new SessionStoreBuilderWithHeaders<>(supplier, keySerde, valueSerde, Time.SYSTEM);
666+
return new SessionStoreWithHeadersBuilder<>(supplier, keySerde, valueSerde, Time.SYSTEM);
667667
}
668668

669669
/**

streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderWithHeaders.java renamed to streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreWithHeadersBuilder.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,12 @@
3737
* {@link AggregationWithHeaders} as the value wrapper and wires up the
3838
* header-aware store stack (change-logging, caching, metering).
3939
*/
40-
public class SessionStoreBuilderWithHeaders<K, V>
40+
public class SessionStoreWithHeadersBuilder<K, V>
4141
extends AbstractStoreBuilder<K, AggregationWithHeaders<V>, SessionStoreWithHeaders<K, V>> {
4242

4343
private final SessionBytesStoreSupplier storeSupplier;
4444

45-
public SessionStoreBuilderWithHeaders(final SessionBytesStoreSupplier storeSupplier,
45+
public SessionStoreWithHeadersBuilder(final SessionBytesStoreSupplier storeSupplier,
4646
final Serde<K> keySerde,
4747
final Serde<V> valueSerde,
4848
final Time time) {

streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ private void initStore(final boolean enableCaching) {
154154
Stores.persistentSessionStore(STORE_NAME, ofMillis(GAP_MS * 3));
155155

156156
final StoreBuilder<SessionStoreWithHeaders<String, Long>> storeBuilder =
157-
Stores.sessionStoreBuilderWithHeaders(supplier, Serdes.String(), Serdes.Long())
157+
Stores.sessionStoreWithHeadersBuilder(supplier, Serdes.String(), Serdes.Long())
158158
.withLoggingDisabled();
159159

160160
if (enableCaching && emitStrategy.type() != EmitStrategy.StrategyType.ON_WINDOW_CLOSE) {

streams/src/test/java/org/apache/kafka/streams/state/QueryableStoreTypesWithHeadersTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ public void shouldAcceptRegularWindowStoreForWindowStoreType() {
200200
@Test
201201
public void shouldAcceptSessionStoreWithHeadersForSessionStoreType() {
202202
final SessionStoreWithHeaders<String, String> store =
203-
Stores.sessionStoreBuilderWithHeaders(
203+
Stores.sessionStoreWithHeadersBuilder(
204204
Stores.inMemorySessionStore(
205205
"test-session-store",
206206
Duration.ofMillis(100)),

streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreWithHeadersSerializerSideEffectTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ public void shouldPropagateSerializerHeaderSideEffectToChangelog() {
135135

136136
// Create a session store with headers using our custom serializer
137137
builder.addStateStore(
138-
Stores.sessionStoreBuilderWithHeaders(
138+
Stores.sessionStoreWithHeadersBuilder(
139139
Stores.inMemorySessionStore(
140140
STORE_NAME,
141141
Duration.ofMillis(10000L)

0 commit comments

Comments
 (0)