Skip to content

Commit 4ff355c

Browse files
authored
Merge pull request #1 from danube-messaging/int_tests
correct the integration tests and tests on pull requests
2 parents d102d49 + 460bd48 commit 4ff355c

6 files changed

Lines changed: 214 additions & 153 deletions

File tree

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
name: Integration Tests
2+
3+
on:
4+
push:
5+
branches: [main]
6+
pull_request:
7+
8+
jobs:
9+
integration-tests:
10+
runs-on: ubuntu-latest
11+
12+
steps:
13+
- name: Checkout code
14+
uses: actions/checkout@v4
15+
16+
- name: Set up Java
17+
uses: actions/setup-java@v4
18+
with:
19+
distribution: temurin
20+
java-version: '21'
21+
cache: maven
22+
23+
- name: Start test cluster
24+
working-directory: docker
25+
run: docker compose up -d
26+
27+
- name: Wait for broker to be healthy
28+
working-directory: docker
29+
run: |
30+
echo "Waiting for broker to become healthy..."
31+
for i in $(seq 1 30); do
32+
status=$(docker inspect --format='{{.State.Health.Status}}' danube-test-broker 2>/dev/null || echo "missing")
33+
if [ "$status" = "healthy" ]; then
34+
echo "Broker is healthy."
35+
break
36+
fi
37+
if [ "$i" -eq 30 ]; then
38+
echo "Broker failed to become healthy (status: $status)"
39+
docker compose logs broker
40+
exit 1
41+
fi
42+
echo " attempt $i/30 — status: $status"
43+
sleep 5
44+
done
45+
46+
- name: Run integration tests
47+
run: mvn -pl danube-client -am verify -P integration-tests
48+
49+
- name: Print broker logs on failure
50+
if: failure()
51+
working-directory: docker
52+
run: docker compose logs broker
53+
54+
- name: Stop test cluster
55+
if: always()
56+
working-directory: docker
57+
run: docker compose down -v

danube-client/src/test/java/com/danubemessaging/client/it/FanoutExclusiveIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import com.danubemessaging.client.model.StreamMessage;
88
import org.junit.jupiter.api.Test;
99

10-
import java.time.Duration;
1110
import java.util.List;
1211
import java.util.Map;
1312
import java.util.Set;
@@ -38,7 +37,8 @@ void fanoutExclusive() throws Exception {
3837
.build();
3938
producer.create();
4039

41-
record ConsumerEntry(String name, Consumer consumer, TestHelpers.MessageCollector collector) {}
40+
record ConsumerEntry(String name, Consumer consumer, TestHelpers.MessageCollector collector) {
41+
}
4242

4343
List<ConsumerEntry> consumers = new CopyOnWriteArrayList<>();
4444
for (int i = 0; i < 3; i++) {

danube-client/src/test/java/com/danubemessaging/client/it/PartitionedBasicIT.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,10 @@
77
import com.danubemessaging.client.model.StreamMessage;
88
import org.junit.jupiter.api.Test;
99

10-
import java.time.Duration;
1110
import java.util.HashSet;
12-
import java.util.List;
1311
import java.util.Map;
1412
import java.util.Set;
13+
import java.util.concurrent.TimeUnit;
1514

1615
import static com.danubemessaging.client.it.TestHelpers.*;
1716
import static org.junit.jupiter.api.Assertions.*;
@@ -22,7 +21,7 @@
2221
*/
2322
class PartitionedBasicIT {
2423

25-
private void runPartitionedBasic(String topicPrefix, SubType subType) {
24+
private void runPartitionedBasic(String topicPrefix, SubType subType) throws Exception {
2625
DanubeClient client = newClient();
2726
String topic = uniqueTopic(topicPrefix);
2827
int partitions = 3;
@@ -43,21 +42,25 @@ private void runPartitionedBasic(String topicPrefix, SubType subType) {
4342
consumer.subscribe();
4443

4544
try {
46-
var publisher = consumer.receive();
45+
String[] expected = { "Hello Danube 1", "Hello Danube 2", "Hello Danube 3" };
46+
47+
// Attach collector BEFORE sending
48+
var collector = new TestHelpers.MessageCollector(expected.length);
49+
consumer.receive().subscribe(collector);
4750

4851
Thread.sleep(300);
4952

50-
String[] expected = {"Hello Danube 1", "Hello Danube 2", "Hello Danube 3"};
5153
for (String body : expected) {
5254
producer.send(body.getBytes(), Map.of());
5355
}
5456

55-
List<StreamMessage> messages = receiveMessages(publisher, expected.length, Duration.ofSeconds(10));
57+
assertTrue(collector.latch.await(10, TimeUnit.SECONDS),
58+
"Timeout: received " + collector.messages.size() + "/" + expected.length);
5659

5760
// Verify all payloads received
5861
Set<String> received = new HashSet<>();
5962
Set<String> partsSeen = new HashSet<>();
60-
for (StreamMessage msg : messages) {
63+
for (StreamMessage msg : collector.messages) {
6164
received.add(new String(msg.payload()));
6265
partsSeen.add(msg.messageId().topicName());
6366
consumer.ack(msg);
@@ -72,22 +75,19 @@ private void runPartitionedBasic(String topicPrefix, SubType subType) {
7275
String partName = topic + "-part-" + i;
7376
assertTrue(partsSeen.contains(partName), "missing partition: " + partName);
7477
}
75-
} catch (InterruptedException e) {
76-
Thread.currentThread().interrupt();
77-
fail("Interrupted");
7878
} finally {
7979
consumer.close();
8080
client.close();
8181
}
8282
}
8383

8484
@Test
85-
void partitionedBasicExclusive() {
85+
void partitionedBasicExclusive() throws Exception {
8686
runPartitionedBasic("/default/part_basic_excl", SubType.EXCLUSIVE);
8787
}
8888

8989
@Test
90-
void partitionedBasicShared() {
90+
void partitionedBasicShared() throws Exception {
9191
runPartitionedBasic("/default/part_basic_shared", SubType.SHARED);
9292
}
9393
}

danube-client/src/test/java/com/danubemessaging/client/it/ReliableDispatchIT.java

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
import com.danubemessaging.client.model.StreamMessage;
99
import org.junit.jupiter.api.Test;
1010

11-
import java.time.Duration;
1211
import java.util.Arrays;
1312
import java.util.Map;
1413
import java.util.concurrent.CountDownLatch;
@@ -51,20 +50,12 @@ private void runReliableBasic(String topicPrefix, SubType subType) throws Except
5150
Arrays.fill(blobData, (byte) 'D');
5251
int messageCount = 20;
5352

54-
var publisher = consumer.receive();
55-
56-
Thread.sleep(400);
57-
58-
for (int i = 0; i < messageCount; i++) {
59-
producer.send(blobData, Map.of());
60-
}
61-
62-
// Receive and ack inline
53+
// Attach subscriber BEFORE sending so the receive stream is ready
6354
AtomicInteger count = new AtomicInteger();
6455
AtomicReference<Throwable> error = new AtomicReference<>();
6556
CountDownLatch done = new CountDownLatch(1);
6657

67-
publisher.subscribe(new TestHelpers.MessageCollector(messageCount) {
58+
consumer.receive().subscribe(new TestHelpers.MessageCollector(messageCount) {
6859
@Override
6960
public void onNext(StreamMessage item) {
7061
try {
@@ -81,6 +72,12 @@ public void onNext(StreamMessage item) {
8172
}
8273
});
8374

75+
Thread.sleep(400);
76+
77+
for (int i = 0; i < messageCount; i++) {
78+
producer.send(blobData, Map.of());
79+
}
80+
8481
assertTrue(done.await(15, TimeUnit.SECONDS),
8582
"Timeout: received " + count.get() + "/" + messageCount);
8683

0 commit comments

Comments
 (0)