Skip to content

Commit e48bccf

Browse files
committed
Use singleton kafka consumer and poll the messages asynchronously
1 parent 164cd2f commit e48bccf

11 files changed

Lines changed: 187 additions & 196 deletions

File tree

basyx.aasenvironment/basyx.aasenvironment.component/src/test/java/org/eclipse/digitaltwin/basyx/aasenvironment/component/KafkaEventsInMemoryStorageIntegrationTest.java

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
******************************************************************************/
2626
package org.eclipse.digitaltwin.basyx.aasenvironment.component;
2727

28-
import org.eclipse.digitaltwin.aas4j.v3.dataformat.core.DeserializationException;
2928
import org.eclipse.digitaltwin.aas4j.v3.dataformat.json.JsonSerializer;
3029
import org.eclipse.digitaltwin.aas4j.v3.model.AssetAdministrationShell;
3130
import org.eclipse.digitaltwin.aas4j.v3.model.Submodel;
@@ -43,10 +42,8 @@
4342
import org.eclipse.digitaltwin.basyx.submodelservice.feature.kafka.events.model.SubmodelEvent;
4443
import org.eclipse.digitaltwin.basyx.submodelservice.feature.kafka.events.model.SubmodelEventType;
4544
import org.junit.After;
46-
import org.junit.AfterClass;
4745
import org.junit.Assert;
4846
import org.junit.Before;
49-
import org.junit.BeforeClass;
5047
import org.junit.Test;
5148
import org.junit.runner.RunWith;
5249
import org.springframework.beans.factory.annotation.Autowired;
@@ -91,24 +88,11 @@ public class KafkaEventsInMemoryStorageIntegrationTest {
9188
@Autowired
9289
private AasRepository aasRepo;
9390

94-
private static KafkaAdapter<SubmodelEvent> adapterSm;
95-
private static KafkaAdapter<AasEvent> adapterAas;
96-
97-
@BeforeClass
98-
public static void initAdapter() {
99-
adapterSm = new KafkaAdapter<>("localhost:9092", "submodel-events", SubmodelEvent.class);
100-
adapterAas = new KafkaAdapter<>("localhost:9092", "aas-events", AasEvent.class);
101-
}
102-
103-
@AfterClass
104-
public static void disposeAdapter() {
105-
adapterSm.close();
106-
adapterAas.close();
107-
}
91+
private static KafkaAdapter<SubmodelEvent> adapterSm = KafkaAdapters.getAdapter("submodel-events", SubmodelEvent.class);
92+
private static KafkaAdapter<AasEvent> adapterAas = KafkaAdapters.getAdapter("aas-events", AasEvent.class);
10893

10994
@Before
11095
public void init() {
111-
11296
cleanup();
11397
}
11498

basyx.aasregistry/basyx.aasregistry-service-basetests/src/main/java/org/eclipse/digitaltwin/basyx/aasregistry/service/tests/integration/BaseIntegrationTest.java

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@
8686
import org.eclipse.digitaltwin.basyx.aasregistry.service.events.RegistryEvent.EventType;
8787
import org.eclipse.digitaltwin.basyx.aasregistry.service.tests.TestResourcesLoader;
8888
import org.eclipse.digitaltwin.basyx.kafka.KafkaAdapter;
89+
import org.eclipse.digitaltwin.basyx.kafka.KafkaAdapters;
8990
import org.junit.After;
9091
import org.junit.Before;
9192
import org.junit.Rule;
@@ -135,35 +136,33 @@ public abstract class BaseIntegrationTest {
135136

136137
private ObjectMapper mapper = new ObjectMapper();
137138

139+
140+
private final KafkaAdapter<RegistryEvent> adapter = KafkaAdapters.getAdapter("aas-registry", RegistryEvent.class);
141+
138142
@Rule
139143
public TestResourcesLoader resourceLoader = new TestResourcesLoader(BaseIntegrationTest.class.getPackageName(), mapper);
140144

141145
protected RegistryAndDiscoveryInterfaceApi api;
142146

143147
@Before
144148
public void setUp() throws Exception {
145-
adapter().skipMessages();
146149
initClient();
150+
adapter.skipMessages();
147151
cleanup();
148152
}
149153

154+
150155
protected void initClient() throws Exception {
151156
api = new RegistryAndDiscoveryInterfaceApi("http", "127.0.0.1", port);
152157
}
153158

154159
protected void cleanup() throws ApiException, InterruptedException, DeserializationException {
155-
adapter().assertNoAdditionalMessages();
156160
GetAssetAdministrationShellDescriptorsResult result = api.getAllAssetAdministrationShellDescriptors(null, null, null, null);
157161
for (AssetAdministrationShellDescriptor eachDescriptor : result.getResult()) {
158162
api.deleteAssetAdministrationShellDescriptorById(eachDescriptor.getId());
159163
assertThatEventWasSend(RegistryEvent.builder().id(eachDescriptor.getId()).type(EventType.AAS_UNREGISTERED).build());
160164
}
161-
adapter().assertNoAdditionalMessages();
162-
}
163-
164-
165-
protected final KafkaAdapter<RegistryEvent> adapter() {
166-
return new KafkaAdapter<>("localhost:9092", "aas-registry", RegistryEvent.class);
165+
adapter.assertNoAdditionalMessages();
167166
}
168167

169168
@Test
@@ -185,7 +184,7 @@ public void whenWritingParallel_transactionManagementWorks() throws ApiException
185184
assertThat(IntStream.iterate(0, i -> i + 1).limit(300).parallel().mapToObj(op).filter(i -> i > 300).findAny()).isEmpty();
186185
assertThat(api.getAssetAdministrationShellDescriptorById(descriptor.getId()).getSubmodelDescriptors()).hasSize(300);
187186
for (int i = 0; i < 300; i++) {
188-
RegistryEvent evt = adapter().next();
187+
RegistryEvent evt = adapter.next();
189188
assertThat(evt.getId()).isEqualTo(descriptor.getId());
190189
assertThat(Integer.parseInt(evt.getSubmodelId())).isGreaterThanOrEqualTo(0).isLessThan(300);
191190

@@ -235,7 +234,7 @@ public void whenDeleteAll_thenAllDescriptorsAreRemoved() throws ApiException {
235234
HashSet<RegistryEvent> events = new HashSet<>();
236235
// we do not have a specific order, so read all events first
237236
for (int i = 0; i < DELETE_ALL_TEST_INSTANCE_COUNT; i++) {
238-
events.add(adapter().next());
237+
events.add(adapter.next());
239238
}
240239
for (int i = 0; i < DELETE_ALL_TEST_INSTANCE_COUNT; i++) {
241240
assertThat(events.remove(RegistryEvent.builder().id("id_" + i).type(EventType.AAS_UNREGISTERED).build())).isTrue();
@@ -256,7 +255,7 @@ public void whenCreateAndDeleteDescriptors_thenAllDescriptorsAreRemoved() throws
256255
all = api.getAllAssetAdministrationShellDescriptors(null, null, null, null).getResult();
257256
assertThat(all).isEmpty();
258257

259-
adapter().assertNoAdditionalMessages();
258+
adapter.assertNoAdditionalMessages();
260259
}
261260

262261
@Test
@@ -288,7 +287,7 @@ public void whenRegisterAndUnregisterSubmodel_thenSubmodelIsCreatedAndDeleted()
288287

289288
aasDescriptor = api.getAssetAdministrationShellDescriptorById(aasId);
290289
assertThat(aasDescriptor.getSubmodelDescriptors()).doesNotContain(toRegister);
291-
adapter().assertNoAdditionalMessages();
290+
adapter.assertNoAdditionalMessages();
292291
}
293292

294293
@Test
@@ -622,7 +621,7 @@ private void assertRestResourceAvailable(String location) throws IOException {
622621
}
623622

624623
private void deleteAdminAssetShellDescriptor(String aasId) throws ApiException {
625-
adapter().assertNoAdditionalMessages();
624+
adapter.assertNoAdditionalMessages();
626625

627626
int response = api.deleteAssetAdministrationShellDescriptorByIdWithHttpInfo(URLEncoder.encode(aasId, StandardCharsets.UTF_8)).getStatusCode();
628627
assertThat(response).isEqualTo(NO_CONTENT);
@@ -646,7 +645,7 @@ private List<AssetAdministrationShellDescriptor> initialize() throws IOException
646645
}
647646

648647
private void assertThatEventWasSend(RegistryEvent expected) {
649-
RegistryEvent evt = adapter().next();
648+
RegistryEvent evt = adapter.next();
650649
assertThat(evt).isEqualTo(expected);
651650
}
652651

basyx.aasrepository/basyx.aasrepository-feature-kafka/src/test/java/org/eclipse/digitaltwin/basyx/aasrepository/feature/kafka/KafkaEventsInMemoryStorageIntegrationTest.java

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,10 @@
4646
import org.eclipse.digitaltwin.basyx.core.filerepository.InMemoryFileRepository;
4747
import org.eclipse.digitaltwin.basyx.core.pagination.PaginationInfo;
4848
import org.eclipse.digitaltwin.basyx.kafka.KafkaAdapter;
49+
import org.eclipse.digitaltwin.basyx.kafka.KafkaAdapters;
4950
import org.junit.After;
50-
import org.junit.AfterClass;
5151
import org.junit.Assert;
5252
import org.junit.Before;
53-
import org.junit.BeforeClass;
5453
import org.junit.Test;
5554
import org.junit.runner.RunWith;
5655
import org.springframework.beans.factory.annotation.Autowired;
@@ -73,17 +72,7 @@
7372
@TestPropertySource(properties = { KafkaAasRepositoryFeature.FEATURENAME + ".enabled=true", "spring.kafka.bootstrap-servers=localhost:9092", KafkaAasRepositoryFeature.FEATURENAME + ".topic.name=" + TestApplication.KAFKA_AAS_TOPIC })
7473
public class KafkaEventsInMemoryStorageIntegrationTest {
7574

76-
private static KafkaAdapter<AasEvent> adapter;
77-
78-
@BeforeClass
79-
public static void initAdapter() {
80-
adapter = new KafkaAdapter<>("localhost:9092", TestApplication.KAFKA_AAS_TOPIC, AasEvent.class);
81-
}
82-
83-
@AfterClass
84-
public static void disposeSdapter() {
85-
adapter.close();
86-
}
75+
private static KafkaAdapter<AasEvent> adapter = KafkaAdapters.getAdapter(TestApplication.KAFKA_AAS_TOPIC, AasEvent.class);
8776

8877
@Autowired
8978
private KafkaAasRepositoryFeature feature;

basyx.aasrepository/basyx.aasrepository.component/src/test/java/org/eclipse/digitaltwin/basyx/aasrepository/component/KafkaFeatureEnabledSmokeTest.java

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -69,17 +69,8 @@
6969
KafkaAasRepositoryFeature.FEATURENAME + ".topic.name=aas-events" })
7070
public class KafkaFeatureEnabledSmokeTest {
7171

72-
private static KafkaAdapter<AasEvent> adapter;
72+
private static KafkaAdapter<AasEvent> adapter = KafkaAdapters.getAdapter(TestApplication.KAFKA_AAS_TOPIC, AasEvent.class);
7373

74-
@BeforeClass
75-
public static void initAdapter() {
76-
adapter = new KafkaAdapter<>("localhost:9092", TestApplication.KAFKA_AAS_TOPIC, AasEvent.class);
77-
}
78-
79-
@AfterClass
80-
public static void disposeAdapter() {
81-
adapter.close();
82-
}
8374

8475
@LocalServerPort
8576
private int port;

0 commit comments

Comments
 (0)