Skip to content

Commit 8bbddab

Browse files
authored
Singleton Kafka Consumer (#762)
* Adjust topic-available timeouts and kafka healthcheck Also rremove label from broker list and just use localhost with portname * Use previous health check * Use previous healthcheck it did not work before because we did specify the .sh ending * Uncomment the submodel service registration test the test setup is not reliable * Trigger build * Remove await for topic assignment should not be required because the healthcheck is now properly set up * Add null checks * Use synchronized Kafka access the asynchron spring kafka event are producing race conditions in tests * Add logging and fix topic name in authorized client test * Add seek to end to make it more secure * Initialialize adapter in beforeall * Use singleton kafka consumer and poll the messages asynchronously * Alter cleanup metho * Remove commented part
1 parent 483d869 commit 8bbddab

41 files changed

Lines changed: 649 additions & 992 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

basyx.aasenvironment/basyx.aasenvironment.component/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,11 @@
2020

2121
<dependencies>
2222

23+
<dependency>
24+
<groupId>org.eclipse.digitaltwin.basyx</groupId>
25+
<artifactId>basyx.kafka</artifactId>
26+
<scope>test</scope>
27+
</dependency>
2328
<dependency>
2429
<groupId>org.eclipse.digitaltwin.basyx</groupId>
2530
<artifactId>basyx.aasrepository.component</artifactId>

basyx.aasenvironment/basyx.aasenvironment.component/src/test/java/org/eclipse/digitaltwin/basyx/aasenvironment/component/KafkaEventsInMemoryStorageIntegrationTest.java

Lines changed: 29 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,15 @@
2929
import org.eclipse.digitaltwin.aas4j.v3.model.AssetAdministrationShell;
3030
import org.eclipse.digitaltwin.aas4j.v3.model.Submodel;
3131
import org.eclipse.digitaltwin.basyx.aasrepository.AasRepository;
32-
import org.eclipse.digitaltwin.basyx.aasrepository.feature.kafka.AasEventKafkaListener;
3332
import org.eclipse.digitaltwin.basyx.aasrepository.feature.kafka.KafkaAasRepositoryFeature;
3433
import org.eclipse.digitaltwin.basyx.aasrepository.feature.kafka.TestShells;
3534
import org.eclipse.digitaltwin.basyx.aasrepository.feature.kafka.events.model.AasEvent;
3635
import org.eclipse.digitaltwin.basyx.aasrepository.feature.kafka.events.model.AasEventType;
3736
import org.eclipse.digitaltwin.basyx.core.pagination.PaginationInfo;
37+
import org.eclipse.digitaltwin.basyx.kafka.KafkaAdapter;
38+
import org.eclipse.digitaltwin.basyx.kafka.KafkaAdapters;
3839
import org.eclipse.digitaltwin.basyx.submodelrepository.SubmodelRepository;
3940
import org.eclipse.digitaltwin.basyx.submodelrepository.feature.kafka.KafkaSubmodelRepositoryFeature;
40-
import org.eclipse.digitaltwin.basyx.submodelservice.feature.kafka.SubmodelEventKafkaListener;
4141
import org.eclipse.digitaltwin.basyx.submodelservice.feature.kafka.TestSubmodels;
4242
import org.eclipse.digitaltwin.basyx.submodelservice.feature.kafka.events.model.SubmodelEvent;
4343
import org.eclipse.digitaltwin.basyx.submodelservice.feature.kafka.events.model.SubmodelEventType;
@@ -50,7 +50,6 @@
5050
import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
5151
import org.springframework.boot.test.context.SpringBootTest;
5252
import org.springframework.context.annotation.ComponentScan;
53-
import org.springframework.context.annotation.Import;
5453
import org.springframework.http.MediaType;
5554
import org.springframework.test.annotation.DirtiesContext;
5655
import org.springframework.test.annotation.DirtiesContext.ClassMode;
@@ -65,94 +64,86 @@
6564
*/
6665
@DirtiesContext(classMode = ClassMode.AFTER_CLASS)
6766
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.MOCK)
68-
@ComponentScan(basePackages = { "org.eclipse.digitaltwin.basyx"})
67+
@ComponentScan(basePackages = { "org.eclipse.digitaltwin.basyx" })
6968
@RunWith(SpringRunner.class)
70-
@TestPropertySource(properties = {
71-
"basyx.environment=",
72-
"basyx.feature.kafka.enabled=true",
73-
"spring.kafka.bootstrap-servers=PLAINTEXT_HOST://localhost:9092"
74-
})
69+
@TestPropertySource(properties = { "basyx.environment=", "basyx.feature.kafka.enabled=true", "spring.kafka.bootstrap-servers=localhost:9092" })
7570
@AutoConfigureMockMvc
76-
@Import({ SubmodelEventKafkaListener.class, AasEventKafkaListener.class})
7771
public class KafkaEventsInMemoryStorageIntegrationTest {
78-
79-
@Autowired
80-
private AasEventKafkaListener aasEventListener;
81-
82-
@Autowired
83-
private SubmodelEventKafkaListener submodelEventListener;
84-
72+
8573
@Autowired
8674
private KafkaAasRepositoryFeature aasFeature;
87-
75+
8876
@Autowired
8977
private KafkaSubmodelRepositoryFeature submodelFeature;
9078

9179
@Autowired
9280
private MockMvc mvc;
93-
81+
9482
@Autowired
9583
private JsonSerializer serializer;
96-
84+
9785
@Autowired
9886
private SubmodelRepository smRepo;
99-
87+
10088
@Autowired
10189
private AasRepository aasRepo;
90+
91+
private static KafkaAdapter<SubmodelEvent> adapterSm = KafkaAdapters.getAdapter("submodel-events", SubmodelEvent.class);
92+
private static KafkaAdapter<AasEvent> adapterAas = KafkaAdapters.getAdapter("aas-events", AasEvent.class);
10293

10394
@Before
104-
public void awaitAssignment() throws InterruptedException {
105-
aasEventListener.awaitTopicAssignment();
106-
submodelEventListener.awaitTopicAssignment();
107-
95+
public void init() {
10896
cleanup();
10997
}
11098

11199
@Test
112100
public void testCreateAas() throws Exception {
113101
AssetAdministrationShell shell = TestShells.shell();
114102
String body = serializer.write(shell);
115-
116-
mvc.perform(MockMvcRequestBuilders.post("/shells").contentType(MediaType.APPLICATION_JSON).content(body).accept(MediaType.APPLICATION_JSON))
117-
.andExpect(MockMvcResultMatchers.status().isCreated())
103+
104+
mvc.perform(MockMvcRequestBuilders.post("/shells").contentType(MediaType.APPLICATION_JSON).content(body).accept(MediaType.APPLICATION_JSON)).andExpect(MockMvcResultMatchers.status().isCreated())
118105
.andExpect(MockMvcResultMatchers.content().json(body));
119-
AasEvent aasEvt = aasEventListener.next();
106+
AasEvent aasEvt = adapterAas.next();
120107
Assert.assertEquals(shell, aasEvt.getAas());
121108
Assert.assertEquals(shell.getId(), aasEvt.getId());
122109
Assert.assertNull(aasEvt.getSubmodelId());
123110
Assert.assertNull(aasEvt.getAssetInformation());
124111
Assert.assertNull(aasEvt.getReference());
125-
126-
Submodel sm = TestSubmodels.createSubmodel("http://submodels/123", "123", "hello");
112+
113+
Submodel sm = TestSubmodels.createSubmodel("http://submodels/123", "123", "hello");
127114
body = serializer.write(sm);
128-
mvc.perform(MockMvcRequestBuilders.post("/submodels").contentType(MediaType.APPLICATION_JSON).content(body).accept(MediaType.APPLICATION_JSON))
129-
.andExpect(MockMvcResultMatchers.status().isCreated());
130-
SubmodelEvent smEvt = submodelEventListener.next();
115+
mvc.perform(MockMvcRequestBuilders.post("/submodels").contentType(MediaType.APPLICATION_JSON).content(body).accept(MediaType.APPLICATION_JSON)).andExpect(MockMvcResultMatchers.status().isCreated());
116+
SubmodelEvent smEvt = adapterSm.next();
131117
Assert.assertEquals(sm, smEvt.getSubmodel());
132118
Assert.assertEquals(sm.getId(), smEvt.getId());
133119
Assert.assertNull(smEvt.getSmElement());
134120
Assert.assertNull(smEvt.getSmElementPath());
135121
}
136122

137-
138123
@Test
139124
public void testFeatureIsEnabled() {
140125
Assert.assertTrue(aasFeature.isEnabled());
141126
Assert.assertTrue(submodelFeature.isEnabled());
142127
}
143128

144129
@After
145-
public void cleanup() throws InterruptedException {
130+
public void dispose() {
131+
cleanup();
132+
}
133+
134+
public void cleanup() {
146135
for (AssetAdministrationShell aas : aasRepo.getAllAas(null, null, new PaginationInfo(null, null)).getResult()) {
147136
aasRepo.deleteAas(aas.getId());
148-
AasEvent aasEvt = aasEventListener.next();
137+
AasEvent aasEvt = adapterAas.next();
149138
Assert.assertEquals(AasEventType.AAS_DELETED, aasEvt.getType());
150139
}
151140

152141
for (Submodel sm : smRepo.getAllSubmodels(new PaginationInfo(null, null)).getResult()) {
153142
smRepo.deleteSubmodel(sm.getId());
154-
SubmodelEvent smEvt = submodelEventListener.next();
143+
SubmodelEvent smEvt = adapterSm.next();
155144
Assert.assertEquals(SubmodelEventType.SM_DELETED, smEvt.getType());
156145
}
146+
adapterSm.assertNoAdditionalMessages();
147+
adapterAas.assertNoAdditionalMessages();
157148
}
158149
}

basyx.aasregistry/basyx.aasregistry-service-basetests/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@
3434
</build>
3535

3636
<dependencies>
37+
<dependency>
38+
<groupId>org.eclipse.digitaltwin.basyx</groupId>
39+
<artifactId>basyx.kafka</artifactId>
40+
</dependency>
3741
<dependency>
3842
<groupId>org.eclipse.digitaltwin.basyx</groupId>
3943
<artifactId>basyx.aasregistry-paths</artifactId>

basyx.aasregistry/basyx.aasregistry-service-basetests/src/main/java/org/eclipse/digitaltwin/basyx/aasregistry/service/tests/integration/AuthorizedAasRegistryTestSuite.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,8 @@ public void reset() throws IOException {
106106

107107
@Test
108108
public void healthEndpointWithoutAuthorization() throws IOException, ParseException {
109+
110+
109111
String expectedHealthEndpointOutput = getStringFromFile("authorization/HealthOutput.json");
110112

111113
String healthEndpointUrl = BASE_URL + "/actuator/health";

basyx.aasregistry/basyx.aasregistry-service-basetests/src/main/java/org/eclipse/digitaltwin/basyx/aasregistry/service/tests/integration/BaseIntegrationTest.java

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import java.util.stream.IntStream;
4242

4343
import org.assertj.core.api.SoftAssertionsProvider.ThrowingRunnable;
44+
import org.eclipse.digitaltwin.aas4j.v3.dataformat.core.DeserializationException;
4445
import org.eclipse.digitaltwin.basyx.aasregistry.client.ApiException;
4546
import org.eclipse.digitaltwin.basyx.aasregistry.client.ApiResponse;
4647
import org.eclipse.digitaltwin.basyx.aasregistry.client.api.RegistryAndDiscoveryInterfaceApi;
@@ -84,6 +85,8 @@
8485
import org.eclipse.digitaltwin.basyx.aasregistry.service.events.RegistryEvent;
8586
import org.eclipse.digitaltwin.basyx.aasregistry.service.events.RegistryEvent.EventType;
8687
import org.eclipse.digitaltwin.basyx.aasregistry.service.tests.TestResourcesLoader;
88+
import org.eclipse.digitaltwin.basyx.kafka.KafkaAdapter;
89+
import org.eclipse.digitaltwin.basyx.kafka.KafkaAdapters;
8790
import org.junit.After;
8891
import org.junit.Before;
8992
import org.junit.Rule;
@@ -133,6 +136,9 @@ public abstract class BaseIntegrationTest {
133136

134137
private ObjectMapper mapper = new ObjectMapper();
135138

139+
140+
private final KafkaAdapter<RegistryEvent> adapter = KafkaAdapters.getAdapter("aas-registry", RegistryEvent.class);
141+
136142
@Rule
137143
public TestResourcesLoader resourceLoader = new TestResourcesLoader(BaseIntegrationTest.class.getPackageName(), mapper);
138144

@@ -141,21 +147,22 @@ public abstract class BaseIntegrationTest {
141147
@Before
142148
public void setUp() throws Exception {
143149
initClient();
150+
adapter.skipMessages();
144151
cleanup();
152+
adapter.skipMessages();
145153
}
146154

155+
147156
protected void initClient() throws Exception {
148157
api = new RegistryAndDiscoveryInterfaceApi("http", "127.0.0.1", port);
149158
}
150159

151-
protected void cleanup() throws ApiException, InterruptedException {
152-
queue().pullAdditionalMessages();
160+
protected void cleanup() throws ApiException, InterruptedException, DeserializationException {
153161
GetAssetAdministrationShellDescriptorsResult result = api.getAllAssetAdministrationShellDescriptors(null, null, null, null);
154162
for (AssetAdministrationShellDescriptor eachDescriptor : result.getResult()) {
155163
api.deleteAssetAdministrationShellDescriptorById(eachDescriptor.getId());
156164
assertThatEventWasSend(RegistryEvent.builder().id(eachDescriptor.getId()).type(EventType.AAS_UNREGISTERED).build());
157165
}
158-
queue().pullAdditionalMessages();
159166
}
160167

161168
@Test
@@ -177,7 +184,7 @@ public void whenWritingParallel_transactionManagementWorks() throws ApiException
177184
assertThat(IntStream.iterate(0, i -> i + 1).limit(300).parallel().mapToObj(op).filter(i -> i > 300).findAny()).isEmpty();
178185
assertThat(api.getAssetAdministrationShellDescriptorById(descriptor.getId()).getSubmodelDescriptors()).hasSize(300);
179186
for (int i = 0; i < 300; i++) {
180-
RegistryEvent evt = queue().poll();
187+
RegistryEvent evt = adapter.next();
181188
assertThat(evt.getId()).isEqualTo(descriptor.getId());
182189
assertThat(Integer.parseInt(evt.getSubmodelId())).isGreaterThanOrEqualTo(0).isLessThan(300);
183190

@@ -227,7 +234,7 @@ public void whenDeleteAll_thenAllDescriptorsAreRemoved() throws ApiException {
227234
HashSet<RegistryEvent> events = new HashSet<>();
228235
// we do not have a specific order, so read all events first
229236
for (int i = 0; i < DELETE_ALL_TEST_INSTANCE_COUNT; i++) {
230-
events.add(queue().poll());
237+
events.add(adapter.next());
231238
}
232239
for (int i = 0; i < DELETE_ALL_TEST_INSTANCE_COUNT; i++) {
233240
assertThat(events.remove(RegistryEvent.builder().id("id_" + i).type(EventType.AAS_UNREGISTERED).build())).isTrue();
@@ -236,7 +243,7 @@ public void whenDeleteAll_thenAllDescriptorsAreRemoved() throws ApiException {
236243
}
237244

238245
@Test
239-
public void whenCreateAndDeleteDescriptors_thenAllDescriptorsAreRemoved() throws IOException, InterruptedException, TimeoutException, ApiException {
246+
public void whenCreateAndDeleteDescriptors_thenAllDescriptorsAreRemoved() throws IOException, InterruptedException, TimeoutException, ApiException, DeserializationException {
240247
List<AssetAdministrationShellDescriptor> deployed = initialize();
241248
List<AssetAdministrationShellDescriptor> all = api.getAllAssetAdministrationShellDescriptors(null, null, null, null).getResult();
242249
assertThat(all).containsExactlyInAnyOrderElementsOf(deployed);
@@ -248,11 +255,11 @@ public void whenCreateAndDeleteDescriptors_thenAllDescriptorsAreRemoved() throws
248255
all = api.getAllAssetAdministrationShellDescriptors(null, null, null, null).getResult();
249256
assertThat(all).isEmpty();
250257

251-
queue().pullAdditionalMessages();
258+
adapter.assertNoAdditionalMessages();
252259
}
253260

254261
@Test
255-
public void whenRegisterAndUnregisterSubmodel_thenSubmodelIsCreatedAndDeleted() throws IOException, InterruptedException, TimeoutException, ApiException {
262+
public void whenRegisterAndUnregisterSubmodel_thenSubmodelIsCreatedAndDeleted() throws IOException, InterruptedException, TimeoutException, ApiException, DeserializationException {
256263
List<AssetAdministrationShellDescriptor> deployed = initialize();
257264
List<AssetAdministrationShellDescriptor> all = api.getAllAssetAdministrationShellDescriptors(null, null, null, null).getResult();
258265
assertThat(all).asList().containsExactlyInAnyOrderElementsOf(deployed);
@@ -280,8 +287,7 @@ public void whenRegisterAndUnregisterSubmodel_thenSubmodelIsCreatedAndDeleted()
280287

281288
aasDescriptor = api.getAssetAdministrationShellDescriptorById(aasId);
282289
assertThat(aasDescriptor.getSubmodelDescriptors()).doesNotContain(toRegister);
283-
284-
queue().pullAdditionalMessages();
290+
adapter.assertNoAdditionalMessages();
285291
}
286292

287293
@Test
@@ -615,14 +621,14 @@ private void assertRestResourceAvailable(String location) throws IOException {
615621
}
616622

617623
private void deleteAdminAssetShellDescriptor(String aasId) throws ApiException {
618-
queue().reset();
624+
adapter.assertNoAdditionalMessages();
619625

620626
int response = api.deleteAssetAdministrationShellDescriptorByIdWithHttpInfo(URLEncoder.encode(aasId, StandardCharsets.UTF_8)).getStatusCode();
621627
assertThat(response).isEqualTo(NO_CONTENT);
622628
assertThatEventWasSend(RegistryEvent.builder().id(aasId).type(EventType.AAS_UNREGISTERED).build());
623629
}
624630

625-
private List<AssetAdministrationShellDescriptor> initialize() throws IOException, InterruptedException, TimeoutException, ApiException {
631+
private List<AssetAdministrationShellDescriptor> initialize() throws IOException, TimeoutException, ApiException {
626632
List<AssetAdministrationShellDescriptor> descriptors = resourceLoader.loadRepositoryDefinition(AssetAdministrationShellDescriptor.class);
627633
List<org.eclipse.digitaltwin.basyx.aasregistry.model.AssetAdministrationShellDescriptor> eventContent = resourceLoader
628634
.loadRepositoryDefinition(org.eclipse.digitaltwin.basyx.aasregistry.model.AssetAdministrationShellDescriptor.class);
@@ -639,7 +645,7 @@ private List<AssetAdministrationShellDescriptor> initialize() throws IOException
639645
}
640646

641647
private void assertThatEventWasSend(RegistryEvent expected) {
642-
RegistryEvent evt = queue().poll();
648+
RegistryEvent evt = adapter.next();
643649
assertThat(evt).isEqualTo(expected);
644650
}
645651

@@ -674,5 +680,4 @@ private <O,I> O convert(I in, Class<I> inCls, Class<O> outCls) throws JsonProces
674680
return mapper.readerFor(outCls).readValue(data);
675681
}
676682

677-
public abstract EventQueue queue();
678683
}

0 commit comments

Comments
 (0)