Skip to content

Commit c10b01b

Browse files
committed
Add null checks
1 parent beeb5c9 commit c10b01b

7 files changed

Lines changed: 185 additions & 144 deletions

File tree

basyx.aasregistry/basyx.aasregistry-service-release-kafka-mem/src/test/java/org/eclipse/digitaltwin/basyx/aasregistry/service/storage/memory/KafkaEventsInMemoryStorageIntegrationTest.java

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -26,23 +26,23 @@
2626

2727
import java.util.Map;
2828
import java.util.concurrent.CountDownLatch;
29-
import java.util.concurrent.TimeUnit;
3029

3130
import org.apache.kafka.common.TopicPartition;
3231
import org.eclipse.digitaltwin.basyx.aasregistry.service.tests.integration.BaseIntegrationTest;
3332
import org.eclipse.digitaltwin.basyx.aasregistry.service.tests.integration.EventQueue;
33+
import org.slf4j.Logger;
34+
import org.slf4j.LoggerFactory;
3435
import org.springframework.beans.factory.annotation.Autowired;
3536
import org.springframework.kafka.annotation.KafkaHandler;
3637
import org.springframework.kafka.annotation.KafkaListener;
3738
import org.springframework.kafka.listener.ConsumerSeekAware;
39+
import org.springframework.messaging.handler.annotation.Payload;
3840
import org.springframework.stereotype.Component;
3941
import org.springframework.test.context.TestPropertySource;
4042

4143
import com.fasterxml.jackson.databind.ObjectMapper;
4244

43-
@TestPropertySource(
44-
properties = {"spring.profiles.active=kafkaEvents,inMemoryStorage",
45-
"spring.kafka.bootstrap-servers=localhost:9092"})
45+
@TestPropertySource(properties = { "spring.profiles.active=kafkaEvents,inMemoryStorage", "spring.kafka.bootstrap-servers=localhost:9092" })
4646
public class KafkaEventsInMemoryStorageIntegrationTest extends BaseIntegrationTest {
4747

4848
@Autowired
@@ -54,43 +54,47 @@ public void setUp() throws Exception {
5454
super.setUp();
5555
}
5656

57-
5857
@Override
5958
public EventQueue queue() {
6059
return listener.queue;
6160
}
6261

63-
@KafkaListener(topics = "aas-registry", batch = "false", groupId = "kafka-test", autoStartup = "true" )
62+
@KafkaListener(topics = "aas-registry", batch = "false", groupId = "kafka-test", autoStartup = "true")
6463
@Component
6564
private static class RegistrationEventKafkaListener implements ConsumerSeekAware {
66-
65+
66+
private static final Logger log = LoggerFactory.getLogger(RegistrationEventKafkaListener.class);
6767
private final EventQueue queue;
6868
private final CountDownLatch latch = new CountDownLatch(1);
69-
69+
7070
@SuppressWarnings("unused")
7171
public RegistrationEventKafkaListener(ObjectMapper mapper) {
7272
this.queue = new EventQueue(mapper);
7373
}
74-
74+
7575
@KafkaHandler
76-
public void receiveMessage(String content) {
76+
public void receiveMessage(@Payload(required = false) String content) {
77+
if (content == null) {
78+
log.warn("Payload is null – topic aas-registry not yet ready?");
79+
return;
80+
}
7781
queue.offer(content);
7882
}
7983

8084
@Override
81-
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments,
82-
ConsumerSeekCallback callback) {
85+
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
8386
for (TopicPartition eachPartition : assignments.keySet()) {
8487
if ("aas-registry".equals(eachPartition.topic())) {
8588
latch.countDown();
8689
}
87-
}
90+
}
8891
}
89-
92+
9093
public void awaitTopicAssignment() throws InterruptedException {
91-
// if (!latch.await(30, TimeUnit.MINUTES)) {
92-
// throw new RuntimeException("Timeout occured while waiting for partition assignment. Is kafka running?");
93-
// }
94+
// if (!latch.await(30, TimeUnit.MINUTES)) {
95+
// throw new RuntimeException("Timeout occured while waiting for partition
96+
// assignment. Is kafka running?");
97+
// }
9498
}
9599
}
96100

basyx.aasregistry/basyx.aasregistry-service-release-kafka-mongodb/src/test/java/org/eclipse/digitaltwin/basyx/aasregistry/service/storage/mongodb/KafkaEventsMongoDbStorageIntegrationTest.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,19 +24,20 @@
2424
******************************************************************************/
2525
package org.eclipse.digitaltwin.basyx.aasregistry.service.storage.mongodb;
2626

27-
import java.util.List;
2827
import java.util.Map;
2928
import java.util.concurrent.CountDownLatch;
30-
import java.util.concurrent.TimeUnit;
3129

3230
import org.apache.kafka.common.TopicPartition;
3331
import org.eclipse.digitaltwin.basyx.aasregistry.service.tests.integration.BaseIntegrationTest;
3432
import org.eclipse.digitaltwin.basyx.aasregistry.service.tests.integration.EventQueue;
33+
import org.slf4j.Logger;
34+
import org.slf4j.LoggerFactory;
3535
import org.springframework.beans.factory.annotation.Autowired;
3636
import org.springframework.beans.factory.annotation.Value;
3737
import org.springframework.kafka.annotation.KafkaHandler;
3838
import org.springframework.kafka.annotation.KafkaListener;
3939
import org.springframework.kafka.listener.ConsumerSeekAware;
40+
import org.springframework.messaging.handler.annotation.Payload;
4041
import org.springframework.stereotype.Component;
4142
import org.springframework.test.context.TestPropertySource;
4243

@@ -65,6 +66,7 @@ public EventQueue queue() {
6566
@Component
6667
public static class RegistrationEventKafkaListener implements ConsumerSeekAware {
6768

69+
private static final Logger log = LoggerFactory.getLogger(RegistrationEventKafkaListener.class);
6870
private final EventQueue queue;
6971
private final CountDownLatch latch = new CountDownLatch(1);
7072

@@ -80,7 +82,11 @@ public EventQueue getQueue() {
8082
}
8183

8284
@KafkaHandler
83-
public void receiveMessage(String content) {
85+
public void receiveMessage(@Payload(required = false)String content) {
86+
if (content == null) {
87+
log.warn("Payload is null – topic aas-registry not yet ready?");
88+
return;
89+
}
8490
queue.offer(content);
8591
}
8692

basyx.aasrepository/basyx.aasrepository-feature-kafka/src/test/java/org/eclipse/digitaltwin/basyx/aasrepository/feature/kafka/AasEventKafkaListener.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,17 +34,22 @@
3434
import org.eclipse.digitaltwin.aas4j.v3.dataformat.core.DeserializationException;
3535
import org.eclipse.digitaltwin.aas4j.v3.dataformat.json.JsonDeserializer;
3636
import org.eclipse.digitaltwin.basyx.aasrepository.feature.kafka.events.model.AasEvent;
37+
import org.slf4j.Logger;
38+
import org.slf4j.LoggerFactory;
3739
import org.springframework.kafka.annotation.KafkaHandler;
3840
import org.springframework.kafka.annotation.KafkaListener;
3941
import org.springframework.kafka.listener.ConsumerSeekAware;
42+
import org.springframework.messaging.handler.annotation.Payload;
4043
import org.springframework.stereotype.Component;
44+
4145
/**
4246
* @author geso02 (Sonnenberg DFKI GmbH)
4347
*/
4448
@KafkaListener(topics = TestApplication.KAFKA_AAS_TOPIC, batch = "false", groupId = TestApplication.KAFKA_GROUP_ID, autoStartup = "true")
45-
@Component
49+
@Component
4650
public class AasEventKafkaListener implements ConsumerSeekAware {
47-
51+
52+
private static final Logger log = LoggerFactory.getLogger(AasEventKafkaListener.class);
4853
private final LinkedBlockingDeque<AasEvent> evt = new LinkedBlockingDeque<AasEvent>();
4954
private final JsonDeserializer deserializer;
5055
private final CountDownLatch latch = new CountDownLatch(1);
@@ -54,7 +59,11 @@ public AasEventKafkaListener(JsonDeserializer deserializer) {
5459
}
5560

5661
@KafkaHandler
57-
public void receiveMessage(String content) {
62+
public void receiveMessage(@Payload(required = false) String content) {
63+
if (content == null) {
64+
log.warn("Payload is null – topic {} not yet ready?", TestApplication.KAFKA_AAS_TOPIC);
65+
return;
66+
}
5867
try {
5968
AasEvent event = deserializer.read(content, AasEvent.class);
6069
evt.offerFirst(event);
@@ -81,8 +90,9 @@ public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, Consumer
8190
}
8291

8392
public void awaitTopicAssignment() throws InterruptedException {
84-
// if (!latch.await(30, TimeUnit.MINUTES)) {
85-
// throw new RuntimeException("Timeout occured while waiting for partition assignment. Is kafka running?");
86-
// }
93+
// if (!latch.await(30, TimeUnit.MINUTES)) {
94+
// throw new RuntimeException("Timeout occured while waiting for partition
95+
// assignment. Is kafka running?");
96+
// }
8797
}
8898
}

basyx.submodelregistry/basyx.submodelregistry-service-release-kafka-mem/src/test/java/org/eclipse/digitaltwin/basyx/submodelregistry/service/storage/memory/KafkaEventsInMemoryStorageIntegrationTest.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,19 +24,20 @@
2424
******************************************************************************/
2525
package org.eclipse.digitaltwin.basyx.submodelregistry.service.storage.memory;
2626

27-
import java.util.List;
2827
import java.util.Map;
2928
import java.util.concurrent.CountDownLatch;
30-
import java.util.concurrent.TimeUnit;
3129

3230
import org.apache.kafka.common.TopicPartition;
3331
import org.eclipse.digitaltwin.basyx.submodelregistry.client.ApiException;
3432
import org.eclipse.digitaltwin.basyx.submodelregistry.service.tests.integration.BaseIntegrationTest;
3533
import org.eclipse.digitaltwin.basyx.submodelregistry.service.tests.integration.EventQueue;
34+
import org.slf4j.Logger;
35+
import org.slf4j.LoggerFactory;
3636
import org.springframework.beans.factory.annotation.Autowired;
3737
import org.springframework.kafka.annotation.KafkaHandler;
3838
import org.springframework.kafka.annotation.KafkaListener;
3939
import org.springframework.kafka.listener.ConsumerSeekAware;
40+
import org.springframework.messaging.handler.annotation.Payload;
4041
import org.springframework.stereotype.Component;
4142
import org.springframework.test.context.TestPropertySource;
4243

@@ -67,7 +68,8 @@ public EventQueue queue() {
6768
@KafkaListener(topics = "submodel-registry", batch = "false", groupId = "kafka-test", autoStartup = "true" )
6869
@Component
6970
private static class RegistrationEventKafkaListener implements ConsumerSeekAware {
70-
71+
72+
private static final Logger log = LoggerFactory.getLogger(RegistrationEventKafkaListener.class);
7173
private final EventQueue queue;
7274
private final CountDownLatch latch = new CountDownLatch(1);
7375

@@ -78,7 +80,11 @@ public RegistrationEventKafkaListener(ObjectMapper mapper) {
7880
}
7981

8082
@KafkaHandler
81-
public void receiveMessage(String content) {
83+
public void receiveMessage(@Payload(required = false)String content) {
84+
if (content == null) {
85+
log.warn("Payload is null – topic submodel-registry not yet ready?");
86+
return;
87+
}
8288
queue.offer(content);
8389
}
8490

basyx.submodelregistry/basyx.submodelregistry-service-release-kafka-mongodb/src/test/java/org/eclipse/digitaltwin/basyx/submodelregistry/service/storage/mongodb/KafkaEventsMongoDbStorageIntegrationTest.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,21 +24,21 @@
2424
******************************************************************************/
2525
package org.eclipse.digitaltwin.basyx.submodelregistry.service.storage.mongodb;
2626

27-
import java.util.List;
2827
import java.util.Map;
2928
import java.util.concurrent.CountDownLatch;
30-
import java.util.concurrent.TimeUnit;
3129

3230
import org.apache.kafka.common.TopicPartition;
3331
import org.eclipse.digitaltwin.basyx.submodelregistry.client.ApiException;
3432
import org.eclipse.digitaltwin.basyx.submodelregistry.service.tests.integration.BaseIntegrationTest;
3533
import org.eclipse.digitaltwin.basyx.submodelregistry.service.tests.integration.EventQueue;
36-
import org.junit.Before;
34+
import org.slf4j.Logger;
35+
import org.slf4j.LoggerFactory;
3736
import org.springframework.beans.factory.annotation.Autowired;
3837
import org.springframework.beans.factory.annotation.Value;
3938
import org.springframework.kafka.annotation.KafkaHandler;
4039
import org.springframework.kafka.annotation.KafkaListener;
4140
import org.springframework.kafka.listener.ConsumerSeekAware;
41+
import org.springframework.messaging.handler.annotation.Payload;
4242
import org.springframework.stereotype.Component;
4343
import org.springframework.test.context.TestPropertySource;
4444

@@ -67,7 +67,8 @@ public EventQueue queue() {
6767
@KafkaListener(topics = "submodel-registry", batch = "false", groupId = "kafka-test", autoStartup = "true" )
6868
@Component
6969
public static class RegistrationEventKafkaListener implements ConsumerSeekAware {
70-
70+
71+
private static final Logger log = LoggerFactory.getLogger(RegistrationEventKafkaListener.class);
7172
private final EventQueue queue;
7273
private final CountDownLatch latch = new CountDownLatch(1);
7374

@@ -79,7 +80,11 @@ public RegistrationEventKafkaListener(ObjectMapper mapper) {
7980
}
8081

8182
@KafkaHandler
82-
public void receiveMessage(String content) {
83+
public void receiveMessage(@Payload(required = false)String content) {
84+
if (content == null) {
85+
log.warn("Payload is null – topic submodel-registry not yet ready?");
86+
return;
87+
}
8388
queue.offer(content);
8489
}
8590

0 commit comments

Comments
 (0)