Skip to content

Commit ba2a75c

Browse files
committed
docs: update 'Testing a Streams Application' example code
Update example code to latest Kafka Streams Test Util API and Junit 5 API.
1 parent e28ef11 commit ba2a75c

1 file changed

Lines changed: 94 additions & 93 deletions

File tree

content/en/42/streams/developer-guide/testing.md

Lines changed: 94 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ To verify the output, you can use `TestOutputTopic` where you configure the topi
7272

7373

7474
TestOutputTopic<String, Long> outputTopic = testDriver.createOutputTopic("output-topic", stringSerde.deserializer(), longSerde.deserializer());
75-
assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("key", 42L)));
75+
assertEquals(KeyValue.pair("a", 42L), outputTopic.readKeyValue());
7676

7777
`TopologyTestDriver` supports punctuations, too. Event-time punctuations are triggered automatically based on the processed records' timestamps. Wall-clock-time punctuations can also be triggered by advancing the test driver's wall-clock-time (the driver mocks wall-clock-time internally to give users control over it).
7878

@@ -98,134 +98,134 @@ The following example demonstrates how to use the test driver and helper classes
9898
private TestInputTopic<String, Long> inputTopic;
9999
private TestOutputTopic<String, Long> outputTopic;
100100
private KeyValueStore<String, Long> store;
101-
102-
private Serde<String> stringSerde = new Serdes.StringSerde();
103-
private Serde<Long> longSerde = new Serdes.LongSerde();
104-
105-
@Before
101+
102+
private final Serde<String> stringSerde = Serdes.String();
103+
private final Serde<Long> longSerde = Serdes.Long();
104+
105+
@BeforeEach
106106
public void setup() {
107-
Topology topology = new Topology();
108-
topology.addSource("sourceProcessor", "input-topic");
109-
topology.addProcessor("aggregator", new CustomMaxAggregatorSupplier(), "sourceProcessor");
110-
topology.addStateStore(
111-
Stores.keyValueStoreBuilder(
112-
Stores.inMemoryKeyValueStore("aggStore"),
113-
Serdes.String(),
114-
Serdes.Long()).withLoggingDisabled(), // need to disable logging to allow store pre-populating
115-
"aggregator");
116-
topology.addSink("sinkProcessor", "result-topic", "aggregator");
117-
107+
108+
var topology = new Topology()
109+
.addSource("sourceProcessor", "input-topic")
110+
.addProcessor("aggregator", new CustomMaxAggregatorSupplier(), "sourceProcessor")
111+
.addStateStore(
112+
Stores.keyValueStoreBuilder(
113+
Stores.persistentKeyValueStore("aggStore"),
114+
stringSerde,
115+
longSerde
116+
),
117+
"aggregator")
118+
.addSink("sinkProcessor", "result-topic", "aggregator");
119+
118120
// setup test driver
119-
Properties props = new Properties();
120-
props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
121-
props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName());
121+
var props = new Properties();
122+
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, stringSerde.getClass().getName());
123+
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, longSerde.getClass().getName());
122124
testDriver = new TopologyTestDriver(topology, props);
123-
125+
124126
// setup test topics
125127
inputTopic = testDriver.createInputTopic("input-topic", stringSerde.serializer(), longSerde.serializer());
126128
outputTopic = testDriver.createOutputTopic("result-topic", stringSerde.deserializer(), longSerde.deserializer());
127-
129+
128130
// pre-populate store
129131
store = testDriver.getKeyValueStore("aggStore");
130132
store.put("a", 21L);
131133
}
132-
133-
@After
134+
135+
@AfterEach
134136
public void tearDown() {
135137
testDriver.close();
136138
}
137139

138140
@Test
139141
public void shouldFlushStoreForFirstInput() {
140142
inputTopic.pipeInput("a", 1L);
141-
assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("a", 21L)));
142-
assertThat(outputTopic.isEmpty(), is(true));
143+
assertEquals(KeyValue.pair("a", 21L), outputTopic.readKeyValue());
144+
assertTrue(outputTopic.isEmpty());
143145
}
144-
146+
145147
@Test
146148
public void shouldNotUpdateStoreForSmallerValue() {
147149
inputTopic.pipeInput("a", 1L);
148-
assertThat(store.get("a"), equalTo(21L));
149-
assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("a", 21L)));
150-
assertThat(outputTopic.isEmpty(), is(true));
150+
assertEquals(21L, store.get("a"));
151+
assertEquals(KeyValue.pair("a", 21L), outputTopic.readKeyValue());
152+
assertTrue(outputTopic.isEmpty());
151153
}
152-
154+
153155
@Test
154-
public void shouldNotUpdateStoreForLargerValue() {
156+
public void shouldUpdateStoreForLargerValue() {
155157
inputTopic.pipeInput("a", 42L);
156-
assertThat(store.get("a"), equalTo(42L));
157-
assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("a", 42L)));
158-
assertThat(outputTopic.isEmpty(), is(true));
158+
assertEquals(42L, store.get("a"));
159+
assertEquals(KeyValue.pair("a", 42L), outputTopic.readKeyValue());
160+
assertTrue(outputTopic.isEmpty());
159161
}
160-
162+
161163
@Test
162164
public void shouldUpdateStoreForNewKey() {
163165
inputTopic.pipeInput("b", 21L);
164-
assertThat(store.get("b"), equalTo(21L));
165-
assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("a", 21L)));
166-
assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("b", 21L)));
167-
assertThat(outputTopic.isEmpty(), is(true));
166+
assertEquals(21L, store.get("b"));
167+
assertEquals(KeyValue.pair("a", 21L), outputTopic.readKeyValue());
168+
assertEquals(KeyValue.pair("b", 21L), outputTopic.readKeyValue());
169+
assertTrue(outputTopic.isEmpty());
168170
}
169-
171+
170172
@Test
171-
public void shouldPunctuateIfEvenTimeAdvances() {
172-
final Instant recordTime = Instant.now();
173-
inputTopic.pipeInput("a", 1L, recordTime);
174-
assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("a", 21L)));
175-
176-
inputTopic.pipeInput("a", 1L, recordTime);
177-
assertThat(outputTopic.isEmpty(), is(true));
178-
179-
inputTopic.pipeInput("a", 1L, recordTime.plusSeconds(10L));
180-
assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("a", 21L)));
181-
assertThat(outputTopic.isEmpty(), is(true));
173+
public void shouldPunctuateIfStreamTimeAdvances() {
174+
var recordTime = Instant.now();
175+
inputTopic.pipeInput("a", 1L, recordTime);
176+
assertEquals(KeyValue.pair("a", 21L), outputTopic.readKeyValue());
177+
178+
inputTopic.pipeInput("a", 1L, recordTime);
179+
assertTrue(outputTopic.isEmpty());
180+
181+
inputTopic.pipeInput("a", 1L, recordTime.plusSeconds(10));
182+
assertEquals(KeyValue.pair("a", 21L), outputTopic.readKeyValue());
183+
assertTrue(outputTopic.isEmpty());
182184
}
183-
185+
184186
@Test
185187
public void shouldPunctuateIfWallClockTimeAdvances() {
186188
testDriver.advanceWallClockTime(Duration.ofSeconds(60));
187-
assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("a", 21L)));
188-
assertThat(outputTopic.isEmpty(), is(true));
189+
assertEquals(KeyValue.pair("a", 21L), outputTopic.readKeyValue());
190+
assertTrue(outputTopic.isEmpty());
189191
}
190192

191-
public class CustomMaxAggregatorSupplier implements ProcessorSupplier<String, Long> {
193+
static class CustomMaxAggregatorSupplier implements ProcessorSupplier<String, Long, String, Long> {
192194
@Override
193-
public Processor<String, Long> get() {
195+
public Processor<String, Long, String, Long> get() {
194196
return new CustomMaxAggregator();
195197
}
196198
}
197-
198-
public class CustomMaxAggregator implements Processor<String, Long> {
199-
ProcessorContext context;
199+
200+
static class CustomMaxAggregator extends ContextualProcessor<String, Long, String, Long> {
201+
200202
private KeyValueStore<String, Long> store;
201-
202-
@SuppressWarnings("unchecked")
203+
203204
@Override
204-
public void init(ProcessorContext context) {
205-
this.context = context;
206-
context.schedule(Duration.ofSeconds(60), PunctuationType.WALL_CLOCK_TIME, time -> flushStore());
207-
context.schedule(Duration.ofSeconds(10), PunctuationType.STREAM_TIME, time -> flushStore());
208-
store = (KeyValueStore<String, Long>) context.getStateStore("aggStore");
205+
public void init(ProcessorContext<String, Long> context) {
206+
super.init(context);
207+
context.schedule(Duration.ofSeconds(60), WALL_CLOCK_TIME, this::flushStore);
208+
context.schedule(Duration.ofSeconds(10), STREAM_TIME, this::flushStore);
209+
store = context.getStateStore("aggStore");
209210
}
210-
211+
211212
@Override
212-
public void process(String key, Long value) {
213-
Long oldValue = store.get(key);
214-
if (oldValue == null || value > oldValue) {
215-
store.put(key, value);
213+
public void process(Record<String, Long> record) {
214+
var oldValue = store.get(record.key());
215+
if (oldValue == null || record.value() > oldValue) {
216+
store.put(record.key(), record.value());
216217
}
217218
}
218-
219-
private void flushStore() {
220-
KeyValueIterator<String, Long> it = store.all();
221-
while (it.hasNext()) {
222-
KeyValue<String, Long> next = it.next();
223-
context.forward(next.key, next.value);
219+
220+
private void flushStore(long timestamp) {
221+
try (var it = store.all()) {
222+
while (it.hasNext()) {
223+
var next = it.next();
224+
context().forward(new Record<>(next.key, next.value, timestamp));
225+
}
224226
}
225227
}
226-
227-
@Override
228-
public void close() {}
228+
229229
}
230230

231231
# Unit Testing Processors
@@ -260,13 +260,13 @@ The mock will capture any values that your processor forwards. You can make asse
260260
processorUnderTest.process("key", "value");
261261

262262
final Iterator<CapturedForward<? extends String, ? extends Long>> forwarded = context.forwarded().iterator();
263-
assertEquals(forwarded.next().record(), new Record<>(..., ...));
263+
assertEquals(new Record<>(..., ...), forwarded.next().record());
264264
assertFalse(forwarded.hasNext());
265265

266266
// you can reset forwards to clear the captured data. This may be helpful in constructing longer scenarios.
267267
context.resetForwards();
268268

269-
assertEquals(context.forwarded().size(), 0);
269+
assertEquals(0, context.forwarded().size());
270270

271271
If your processor forwards to specific child processors, you can query the context for captured data by child name:
272272

@@ -296,28 +296,29 @@ Once these are set, the context will continue returning the same values, until y
296296

297297
In case your punctuator is stateful, the mock context allows you to register state stores. You're encouraged to use a simple in-memory store of the appropriate type (KeyValue, Windowed, or Session), since the mock context does _not_ manage changelogs, state directories, etc.
298298

299-
300-
final KeyValueStore<String, Integer> store =
301-
Stores.keyValueStoreBuilder(
302-
Stores.inMemoryKeyValueStore("myStore"),
303-
Serdes.String(),
304-
Serdes.Integer()
305-
)
299+
final KeyValueStore<String, Integer> store = Stores
300+
.keyValueStoreBuilder(
301+
Stores.inMemoryKeyValueStore("myStore"),
302+
Serdes.String(),
303+
Serdes.Integer())
306304
.withLoggingDisabled() // Changelog is not supported by MockProcessorContext.
307305
.build();
308-
store.init(context, store);
309-
context.register(store, /*deprecated parameter*/ false, /*parameter unused in mock*/ null);
306+
307+
context = new MockProcessorContext<>();
308+
store.init(context.getStateStoreContext(), store);
309+
context.addStateStore(store);
310310

311311
**Verifying punctuators**
312312

313313
Processors can schedule punctuators to handle periodic tasks. The mock context does _not_ automatically execute punctuators, but it does capture them to allow you to unit test them as well:
314314

315315

316316
final MockProcessorContext.CapturedPunctuator capturedPunctuator = context.scheduledPunctuators().get(0);
317-
final long interval = capturedPunctuator.getIntervalMs();
317+
final Duration interval = capturedPunctuator.getInterval();
318318
final PunctuationType type = capturedPunctuator.getType();
319319
final boolean cancelled = capturedPunctuator.cancelled();
320320
final Punctuator punctuator = capturedPunctuator.getPunctuator();
321+
321322
punctuator.punctuate(/*timestamp*/ 0L);
322323

323324
If you need to write tests involving automatic firing of scheduled punctuators, we recommend creating a simple topology with your processor and using the [`TopologyTestDriver`](testing.html#testing-topologytestdriver).

0 commit comments

Comments
 (0)