3939import org .apache .kafka .common .serialization .StringDeserializer ;
4040import org .eclipse .digitaltwin .aas4j .v3 .dataformat .core .DeserializationException ;
4141import org .eclipse .digitaltwin .aas4j .v3 .dataformat .json .JsonDeserializer ;
42+ import org .slf4j .Logger ;
43+ import org .slf4j .LoggerFactory ;
4244
4345public class KafkaAdapter <T > {
46+
47+ private static final Logger LOG = LoggerFactory .getLogger (KafkaAdapter .class );
48+
4449 private final KafkaConsumer <String , String > consumer ;
4550
4651 private final String bootstrapServers ;
@@ -92,33 +97,38 @@ private KafkaConsumer<String, String> init() {
9297 }
9398
9499 private void awaitAssignment () {
95- long deadline = System .currentTimeMillis () + assignmentTimeout .toMillis ();
100+ LOG .info ("Await Assignment" );
101+ long start = System .currentTimeMillis ();
102+ long deadline = start + assignmentTimeout .toMillis ();
96103 while (consumer .assignment ().isEmpty () && System .currentTimeMillis () < deadline ) {
97104 consumer .poll (Duration .ofMillis (100 ));
98105 }
99106 if (consumer .assignment ().isEmpty ()) {
100107 throw new RuntimeException ("Failed to wait for topic assignment. Is KAFKA running?" );
101108 }
109+ LOG .info ("Partitions {} assigned after {} ms." + consumer .assignment (), System .currentTimeMillis () - start );
102110 }
103111
104112 private String nextMessage () {
105113 return nextMessage (pollTimeout );
106114 }
107115
108116 private String nextMessage (Duration duration ) {
109-
117+ LOG . info ( "Reading Kafka message" );
110118 long deadline = System .currentTimeMillis () + duration .toMillis ();
111119
112120 while (deque .isEmpty () && System .currentTimeMillis () < deadline ) {
113- ConsumerRecords <String , String > records = consumer .poll (Duration .ofMillis (200 ));
121+ ConsumerRecords <String , String > records = consumer .poll (Duration .ofMillis (100 ));
114122 for (ConsumerRecord <String , String > record : records ) {
115123 this .deque .add (record .value ());
116- consumer .commitSync ();
124+ consumer .commitAsync ();
117125 }
118126 }
119127 if (!deque .isEmpty ()) {
128+ LOG .info ("Got message" );
120129 return deque .remove ();
121130 }
131+ LOG .info ("Failed to receive message" );
122132 return null ;
123133 }
124134
@@ -136,17 +146,19 @@ public T next() {
136146 }
137147
138148 public void assertNoAdditionalMessages () {
139- String next = nextMessage (Duration .ofSeconds ( 1 ));
149+ String next = nextMessage (Duration .ofMillis ( 100 ));
140150 if (next != null ) {
141151 throw new RuntimeException ("Got an additional message within 1 second: \n " + next );
142152 }
143153 }
144154
145155 public void close () {
156+ LOG .info ("Dispose" );
146157 this .consumer .close ();
147158 }
148159
149160 public void skipMessages () {
161+ LOG .info ("SkipMessages" );
150162 while (nextMessage (Duration .ofMillis (100 )) != null );
151163 }
152164
0 commit comments