Skip to content

Commit cc2cb6a

Browse files
committed
KAFKA-20173: Ensure Metered session-stores pass headers correctly (#21957)
Ensures that all Metered Session-stores (plain and headers) pass headers into de/serializers. Reviewers: Uladzislau Blok <blokv75@gmail.com>, TengYao Chi <frankvicky@apache.org>
1 parent 28d58f6 commit cc2cb6a

7 files changed

Lines changed: 363 additions & 299 deletions

File tree

streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java

Lines changed: 128 additions & 102 deletions
Large diffs are not rendered by default.

streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreWithHeaders.java

Lines changed: 142 additions & 102 deletions
Large diffs are not rendered by default.

streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java

Lines changed: 4 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
import org.apache.kafka.common.header.Headers;
2020
import org.apache.kafka.common.header.internals.RecordHeaders;
21-
import org.apache.kafka.common.metrics.Sensor;
2221
import org.apache.kafka.common.serialization.Serde;
2322
import org.apache.kafka.common.utils.Bytes;
2423
import org.apache.kafka.common.utils.Time;
@@ -45,9 +44,6 @@
4544
import org.apache.kafka.streams.state.WindowStoreIterator;
4645

4746
import java.util.Objects;
48-
import java.util.Set;
49-
import java.util.concurrent.atomic.LongAdder;
50-
import java.util.function.BiFunction;
5147
import java.util.function.Function;
5248

5349
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
@@ -184,9 +180,7 @@ private <R> QueryResult<R> runWindowKeyQuery(final WindowKeyQuery<K, ValueTimest
184180
}
185181
);
186182

187-
final QueryResult<MeteredWindowStoreIterator<ValueAndTimestamp<V>>> typedQueryResult =
188-
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, typedResult);
189-
queryResult = (QueryResult<R>) typedQueryResult;
183+
queryResult = (QueryResult<R>) InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, typedResult);
190184
} else {
191185
// For non-timestamped stores, return plain V
192186
final MeteredWindowStoreIterator<V> typedResult = meteredIterator(
@@ -196,10 +190,7 @@ private <R> QueryResult<R> runWindowKeyQuery(final WindowKeyQuery<K, ValueTimest
196190
return vth == null ? null : vth.value();
197191
}
198192
);
199-
200-
final QueryResult<MeteredWindowStoreIterator<V>> typedQueryResult =
201-
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, typedResult);
202-
queryResult = (QueryResult<R>) typedQueryResult;
193+
queryResult = (QueryResult<R>) InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, typedResult);
203194
}
204195
} else {
205196
queryResult = (QueryResult<R>) rawResult;
@@ -420,59 +411,16 @@ private <ValueType> MeteredWindowedKeyValueIterator<K, ValueType> meteredWindowe
420411
rawResult.getResult(),
421412
fetchSensor,
422413
iteratorDurationSensor,
414+
this::deserializeValue,
423415
this::deserializeKey,
416+
ValueTimestampHeaders::headers,
424417
valueConverter,
425418
time,
426419
numOpenIterators,
427420
openIterators
428421
);
429422
}
430423

431-
private final class MeteredWindowedKeyValueWithHeadersIterator<ValueType> extends MeteredWindowedKeyValueIterator<K, ValueType> {
432-
private final BiFunction<byte[], Headers, K> deserializeKey;
433-
private final Function<ValueTimestampHeaders<V>, ValueType> valueConverter;
434-
435-
MeteredWindowedKeyValueWithHeadersIterator(
436-
final KeyValueIterator<Windowed<Bytes>, byte[]> iter,
437-
final Sensor operationSensor,
438-
final Sensor iteratorSensor,
439-
final BiFunction<byte[], Headers, K> deserializeKey,
440-
final Function<ValueTimestampHeaders<V>, ValueType> valueConverter,
441-
final Time time,
442-
final LongAdder numOpenIterators,
443-
final Set<MeteredIterator> openIterators
444-
) {
445-
super(
446-
iter,
447-
operationSensor,
448-
iteratorSensor,
449-
null, // should not be used in super-class
450-
null, // should not be used in super-class
451-
time,
452-
numOpenIterators,
453-
openIterators
454-
);
455-
456-
this.deserializeKey = deserializeKey;
457-
this.valueConverter = valueConverter;
458-
}
459-
460-
@Override
461-
public KeyValue<Windowed<K>, ValueType> next() {
462-
final KeyValue<Windowed<Bytes>, byte[]> next = iter.next();
463-
final ValueTimestampHeaders<V> valueTimestampHeaders = deserializeValue(next.value);
464-
return KeyValue.pair(
465-
windowedKey(next.key, valueTimestampHeaders.headers()),
466-
valueConverter.apply(valueTimestampHeaders)
467-
);
468-
}
469-
470-
private Windowed<K> windowedKey(final Windowed<Bytes> bytesKey, final Headers headers) {
471-
final K key = deserializeKey.apply(bytesKey.key().get(), headers);
472-
return new Windowed<>(key, bytesKey.window());
473-
}
474-
}
475-
476424
private boolean isUnderlyingStoreTimestamped() {
477425
StateStore store = wrapped();
478426
do {
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.streams.state.internals;
18+
19+
import org.apache.kafka.common.header.Headers;
20+
import org.apache.kafka.common.metrics.Sensor;
21+
import org.apache.kafka.common.utils.Bytes;
22+
import org.apache.kafka.common.utils.Time;
23+
import org.apache.kafka.streams.KeyValue;
24+
import org.apache.kafka.streams.kstream.Windowed;
25+
import org.apache.kafka.streams.state.KeyValueIterator;
26+
27+
import java.util.Set;
28+
import java.util.concurrent.atomic.LongAdder;
29+
import java.util.function.BiFunction;
30+
import java.util.function.Function;
31+
32+
final class MeteredWindowedKeyValueWithHeadersIterator<K, VInner, VOuter> extends MeteredWindowedKeyValueIterator<K, VOuter> {
33+
private final Function<byte[], VInner> deserializeValue;
34+
private final BiFunction<byte[], Headers, K> deserializeKey;
35+
private final Function<VInner, Headers> headersExtractor;
36+
private final Function<VInner, VOuter> valueConverter;
37+
38+
MeteredWindowedKeyValueWithHeadersIterator(
39+
final KeyValueIterator<Windowed<Bytes>, byte[]> iter,
40+
final Sensor operationSensor,
41+
final Sensor iteratorSensor,
42+
final Function<byte[], VInner> deserializeValue,
43+
final BiFunction<byte[], Headers, K> deserializeKey,
44+
final Function<VInner, Headers> headersExtractor,
45+
final Function<VInner, VOuter> valueConverter,
46+
final Time time,
47+
final LongAdder numOpenIterators,
48+
final Set<MeteredIterator> openIterators
49+
) {
50+
super(
51+
iter,
52+
operationSensor,
53+
iteratorSensor,
54+
null, // should not be used in super-class
55+
null, // should not be used in super-class
56+
time,
57+
numOpenIterators,
58+
openIterators
59+
);
60+
61+
this.deserializeValue = deserializeValue;
62+
this.deserializeKey = deserializeKey;
63+
this.headersExtractor = headersExtractor;
64+
this.valueConverter = valueConverter;
65+
}
66+
67+
@Override
68+
public KeyValue<Windowed<K>, VOuter> next() {
69+
final KeyValue<Windowed<Bytes>, byte[]> next = iter.next();
70+
final VInner valueTimestampHeaders = deserializeValue.apply(next.value);
71+
return KeyValue.pair(
72+
windowedKey(next.key, headersExtractor.apply(valueTimestampHeaders)),
73+
valueConverter.apply(valueTimestampHeaders)
74+
);
75+
}
76+
77+
private Windowed<K> windowedKey(final Windowed<Bytes> bytesKey, final Headers headers) {
78+
final K key = deserializeKey.apply(bytesKey.key().get(), headers);
79+
return new Windowed<>(key, bytesKey.window());
80+
}
81+
}

streams/src/main/java/org/apache/kafka/streams/state/internals/Utils.java

Lines changed: 0 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@
2121
import org.apache.kafka.common.header.internals.RecordHeaders;
2222
import org.apache.kafka.common.serialization.LongDeserializer;
2323
import org.apache.kafka.common.utils.ByteUtils;
24-
import org.apache.kafka.common.utils.Bytes;
25-
import org.apache.kafka.streams.kstream.Windowed;
2624
import org.apache.kafka.streams.state.StateSerdes;
2725

2826
import java.nio.ByteBuffer;
@@ -54,38 +52,6 @@ public static Headers headers(final byte[] valueWithHeaders) {
5452
return readHeaders(buffer);
5553
}
5654

57-
/**
58-
* Serialize the key with headers into bytes
59-
* @param key the key to serialize
60-
* @param headers the Headers as context
61-
* @param serdes the StateSerdes as serializer
62-
* @return the Bytes of the key
63-
*/
64-
public static <K> Bytes keyBytes(final K key, final Headers headers, final StateSerdes<K, ?> serdes) {
65-
return Bytes.wrap(serdes.rawKey(key, headers));
66-
}
67-
68-
/**
69-
* Serialize the key into bytes
70-
* @param key the key to serialize
71-
* @param serdes the StateSerdes as serializer
72-
* @return the Bytes of the key
73-
*/
74-
static <K> Bytes keyBytes(final K key, final StateSerdes<K, ?> serdes) {
75-
return keyBytes(key, new RecordHeaders(), serdes);
76-
}
77-
78-
/**
79-
* Serialize the session key with headers into bytes
80-
* @param sessionKey the Windowed session key to serialize
81-
* @param headers the Headers as context
82-
* @param serdes the StateSerdes as serializer
83-
* @return the Bytes of the key
84-
*/
85-
static <K> Bytes keyBytes(final Windowed<K> sessionKey, final Headers headers, final StateSerdes<K, ?> serdes) {
86-
return keyBytes(sessionKey.key(), headers, serdes);
87-
}
88-
8955
/**
9056
* Extract the raw aggregation bytes from serialized AggregationWithHeaders,
9157
* stripping the headers prefix.

streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ public class MeteredSessionStoreTest {
103103
private static final Windowed<Bytes> WINDOWED_KEY_BYTES = new Windowed<>(KEY_BYTES, new SessionWindow(0, 0));
104104
private static final String VALUE = "value";
105105
private static final byte[] VALUE_BYTES = VALUE.getBytes();
106+
private static final Headers HEADERS = new RecordHeaders();
106107
private static final long START_TIMESTAMP = 24L;
107108
private static final long END_TIMESTAMP = 42L;
108109
private static final int RETENTION_PERIOD = 100;
@@ -139,8 +140,7 @@ public void setUp() {
139140
setUpWithoutContext();
140141
metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
141142
when(context.applicationId()).thenReturn(APPLICATION_ID);
142-
when(context.metrics())
143-
.thenReturn(new StreamsMetricsImpl(metrics, "test", mockTime));
143+
when(context.metrics()).thenReturn(new StreamsMetricsImpl(metrics, "test", mockTime));
144144
when(context.taskId()).thenReturn(taskId);
145145
when(context.changelogFor(STORE_NAME)).thenReturn(CHANGELOG_TOPIC);
146146
when(innerStore.name()).thenReturn(STORE_NAME);
@@ -187,12 +187,13 @@ private void doShouldPassChangelogTopicNameToStateStoreSerde(final String topic)
187187
final Deserializer<String> valueDeserializer = mock(Deserializer.class);
188188
final Serializer<String> valueSerializer = mock(Serializer.class);
189189
when(keySerde.serializer()).thenReturn(keySerializer);
190-
when(keySerializer.serialize(topic, new RecordHeaders(), KEY)).thenReturn(KEY.getBytes());
190+
when(keySerializer.serialize(topic, HEADERS, KEY)).thenReturn(KEY.getBytes());
191191
when(valueSerde.deserializer()).thenReturn(valueDeserializer);
192-
when(valueDeserializer.deserialize(topic, new RecordHeaders(), VALUE_BYTES)).thenReturn(VALUE);
192+
when(valueDeserializer.deserialize(topic, HEADERS, VALUE_BYTES)).thenReturn(VALUE);
193193
when(valueSerde.serializer()).thenReturn(valueSerializer);
194-
when(valueSerializer.serialize(topic, new RecordHeaders(), VALUE)).thenReturn(VALUE_BYTES);
194+
when(valueSerializer.serialize(topic, HEADERS, VALUE)).thenReturn(VALUE_BYTES);
195195
when(innerStore.fetchSession(KEY_BYTES, START_TIMESTAMP, END_TIMESTAMP)).thenReturn(VALUE_BYTES);
196+
when(context.headers()).thenReturn(HEADERS);
196197
store = new MeteredSessionStore<>(
197198
innerStore,
198199
STORE_TYPE,

streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreWithHeadersTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -841,6 +841,8 @@ private MeteredSessionStoreWithHeaders<String, String> createStoreWithMockSerdes
841841
lenient().when(keyDeserializer.deserialize(any(), eq(HEADERS), eq(KEY.getBytes())))
842842
.thenReturn(KEY);
843843

844+
when(context.headers()).thenReturn(new RecordHeaders());
845+
844846
final MeteredSessionStoreWithHeaders<String, String> mockStore = new MeteredSessionStoreWithHeaders<>(
845847
innerStore,
846848
STORE_TYPE,

0 commit comments

Comments
 (0)