Skip to content

Commit d8c5382

Browse files
authored
Clean up previous messages (#742)
1 parent 2acb07e commit d8c5382

5 files changed

Lines changed: 22 additions & 19 deletions

File tree

basyx.aasrepository/basyx.aasrepository-feature-kafka/src/test/java/org/eclipse/digitaltwin/basyx/aasrepository/feature/kafka/KafkaEventsInMemoryStorageIntegrationTest.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ public class KafkaEventsInMemoryStorageIntegrationTest {
8686
@Before
8787
public void awaitAssignment() throws InterruptedException {
8888
listener.awaitTopicAssignment();
89+
cleanupPreviousMessages();
8990
FileRepository fileRepo = new InMemoryFileRepository();
9091
AasBackend aasRepositoryBackend = new InMemoryAasBackend();
9192
AasServiceFactory sf = new CrudAasServiceFactory(aasRepositoryBackend, fileRepo);
@@ -94,7 +95,9 @@ public void awaitAssignment() throws InterruptedException {
9495

9596
cleanup();
9697
}
97-
98+
private void cleanupPreviousMessages() throws InterruptedException {
99+
while (listener.next(1, TimeUnit.SECONDS) != null);
100+
}
98101

99102
@Test
100103
public void testCreateAas() throws InterruptedException {
@@ -245,9 +248,7 @@ public void cleanup() throws InterruptedException {
245248
Assert.assertEquals(AasEventType.AAS_DELETED, deletedEvt.getType());
246249
Assert.assertEquals(aas.getId(), deletedEvt.getId());
247250
}
248-
}
249-
@After
250-
public void assertNoAdditionalEvent() throws InterruptedException {
251-
Assert.assertNull(listener.next(1, TimeUnit.SECONDS));
251+
AasEvent evt = listener.next(1, TimeUnit.SECONDS);
252+
Assert.assertNull(evt);
252253
}
253254
}

basyx.submodelrepository/basyx.submodelrepository-feature-kafka/src/test/java/org/eclipse/digitaltwin/basyx/submodelrepository/feature/kafka/KafkaEventsInMemoryStorageIntegrationTest.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,9 +105,13 @@ public class KafkaEventsInMemoryStorageIntegrationTest {
105105
public void awaitAssignment() throws InterruptedException {
106106
listener.awaitTopicAssignment();
107107
repo = feature.decorate(factory).create();
108-
108+
cleanupPreviousMessages();
109109
cleanup();
110110
}
111+
112+
private void cleanupPreviousMessages() throws InterruptedException {
113+
while (listener.next(1, TimeUnit.SECONDS) != null);
114+
}
111115

112116
@Test
113117
public void testSubmodelCreated() throws InterruptedException {
@@ -351,6 +355,8 @@ public void cleanup() throws InterruptedException {
351355
Assert.assertEquals(sm.getId(), evt.getId());
352356
}
353357
}
358+
SubmodelEvent evt = listener.next(1, TimeUnit.SECONDS);
359+
Assert.assertNull(evt);
354360
}
355361

356362

basyx.submodelservice/basyx.submodelservice-feature-kafka/src/test/java/org/eclipse/digitaltwin/basyx/submodelservice/feature/kafka/KafkaSubmodelServiceIdsOnlySmokeTest.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,14 +89,18 @@ public class KafkaSubmodelServiceIdsOnlySmokeTest {
8989
public void awaitAssignment() throws InterruptedException, SerializationException {
9090
listener.awaitTopicAssignment();
9191

92-
assertNoAdditionalMessage();
92+
cleanupPreviousMessages();
9393

9494
FileRepository repository = new InMemoryFileRepository();
9595
SubmodelBackend backend = new InMemorySubmodelBackend();
9696
SubmodelServiceFactory smFactory = new CrudSubmodelServiceFactory(backend ,repository);
9797
service = feature.decorate(smFactory).create(submodel);
9898
}
9999

100+
private void cleanupPreviousMessages() throws InterruptedException {
101+
while (listener.next(1, TimeUnit.SECONDS) != null);
102+
}
103+
100104
@After
101105
public void assertNoAdditionalMessage() throws InterruptedException {
102106
SubmodelEvent evt = listener.next(1, TimeUnit.SECONDS);

basyx.submodelservice/basyx.submodelservice-feature-kafka/src/test/java/org/eclipse/digitaltwin/basyx/submodelservice/feature/kafka/KafkaSubmodelServiceSubmodelElementsEventsIntegrationTest.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -90,18 +90,16 @@ public class KafkaSubmodelServiceSubmodelElementsEventsIntegrationTest {
9090
public void awaitAssignment() throws InterruptedException {
9191
listener.awaitTopicAssignment();
9292

93-
skipAdditionalMessage();
93+
cleanupPreviousMessages();
9494

9595
FileRepository repository = new InMemoryFileRepository();
9696
SubmodelBackend backend = new InMemorySubmodelBackend();
9797
SubmodelServiceFactory smFactory = new CrudSubmodelServiceFactory(backend ,repository);
9898
service = feature.decorate(smFactory).create(submodel);
9999
}
100-
101-
@After
102-
public void skipAdditionalMessage() throws InterruptedException {
103-
while(listener.next(1, TimeUnit.SECONDS) != null);
104-
100+
101+
private void cleanupPreviousMessages() throws InterruptedException {
102+
while (listener.next(1, TimeUnit.SECONDS) != null);
105103
}
106104

107105
@Test

basyx.submodelservice/basyx.submodelservice-feature-kafka/src/test/java/org/eclipse/digitaltwin/basyx/submodelservice/feature/kafka/KafkaSubmodelServiceSubmodelEventsIntegrationTest.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -81,12 +81,6 @@ public void awaitAssignment() throws InterruptedException {
8181
listener.awaitTopicAssignment();
8282
}
8383

84-
@After
85-
public void assertGotTearDownMessage() throws InterruptedException {
86-
SubmodelEvent evt = listener.next(1, TimeUnit.MINUTES);
87-
Assert.assertNull(evt);
88-
}
89-
9084
@Test
9185
public void testSubmodelEvents() throws InterruptedException {
9286
// we expect the "onStartup" submodel created event

0 commit comments

Comments
 (0)